Resurrect parallel distributed insert select with s3Cluster (#41535)

This commit is contained in:
Nikita Mikhaylov 2022-10-06 13:47:32 +02:00 committed by GitHub
parent 80ebbfbc56
commit 860e34e760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 528 additions and 90 deletions

View File

@ -331,7 +331,7 @@ BlockIO InterpreterInsertQuery::execute()
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
if (query.select && settings.parallel_distributed_insert_select)
// Distributed INSERT SELECT
distributed_pipeline = table->distributedWrite(query, getContext());

View File

@ -1,4 +1,5 @@
#include <Common/config.h>
#include "Interpreters/Context_fwd.h"
#if USE_HDFS
@ -41,7 +42,7 @@ StorageHDFSCluster::StorageHDFSCluster(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_)
: IStorage(table_id_)
: IStorageCluster(table_id_)
, cluster_name(cluster_name_)
, uri(uri_)
, format_name(format_name_)
@ -74,13 +75,8 @@ Pipe StorageHDFSCluster::read(
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iterator]() mutable -> String
{
return iterator->next();
});
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -117,7 +113,7 @@ Pipe StorageHDFSCluster::read(
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
extension);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}
@ -140,6 +136,18 @@ QueryProcessingStage::Enum StorageHDFSCluster::getQueryProcessingStage(
}
ClusterPtr StorageHDFSCluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, ContextPtr context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
NamesAndTypesList StorageHDFSCluster::getVirtuals() const
{
return NamesAndTypesList{

View File

@ -9,6 +9,7 @@
#include <Client/Connection.h>
#include <Interpreters/Cluster.h>
#include <Storages/IStorageCluster.h>
#include <Storages/HDFS/StorageHDFS.h>
namespace DB
@ -16,7 +17,7 @@ namespace DB
class Context;
class StorageHDFSCluster : public IStorage
class StorageHDFSCluster : public IStorageCluster
{
public:
StorageHDFSCluster(
@ -39,6 +40,9 @@ public:
NamesAndTypesList getVirtuals() const override;
ClusterPtr getCluster(ContextPtr context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
private:
String cluster_name;
String uri;

View File

@ -0,0 +1,29 @@
#pragma once
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
namespace DB
{
/**
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster
* Needed for code simplification around parallel_distributed_insert_select
*/
class IStorageCluster : public IStorage
{
public:
explicit IStorageCluster(const StorageID & table_id_) : IStorage(table_id_) {}
virtual ClusterPtr getCluster(ContextPtr context) const = 0;
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const = 0;
bool isRemote() const override { return true; }
};
}

View File

@ -59,6 +59,8 @@
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/IStorageCluster.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -759,55 +761,35 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
}
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr local_context) const
{
QueryPipeline pipeline;
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::shared_ptr<StorageDistributed> storage_src;
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
const auto & settings = local_context->getSettingsRef();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (select.list_of_selects->children.size() == 1)
/// Unwrap view() function.
if (src_distributed.remote_table_function_ptr)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(src_distributed.remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
if (joined_tables.tablesCount() == 1)
{
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
{
/// Unwrap view() function.
if (storage_src->remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto * select = query.select->as<ASTSelectWithUnionQuery &>().list_of_selects->children.at(0)->as<ASTSelectQuery>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(src_distributed.getRemoteDatabaseName(), src_distributed.getRemoteTableName());
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query;
}
}
}
}
new_query->select = select_with_union_query;
}
const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{};
const Cluster::AddressesWithFailover & src_addresses = src_distributed.getCluster()->getShardsAddresses();
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
/// Compare addresses instead of cluster name, to handle remote()/cluster().
/// (since for remote()/cluster() the getClusterName() is empty string)
@ -822,7 +804,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInser
LOG_WARNING(log,
"Parallel distributed INSERT SELECT is not possible "
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
storage_src ? storage_src->getClusterName() : "<not a Distributed table>",
src_distributed.getClusterName(),
src_addresses.size(),
getClusterName(),
dst_addresses.size());
@ -849,6 +831,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInser
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
@ -882,6 +865,120 @@ std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInser
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
/// Select query is needed for pruining on virtual columns
auto extension = src_storage_cluster.getTaskIteratorExtension(
select.list_of_selects->children.at(0)->as<ASTSelectQuery>()->clone(),
local_context);
auto dst_cluster = getCluster();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
String new_query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
new_query->IAST::format(ast_format_settings);
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
/// Here we take addresses from destination cluster and assume source table exists on these nodes
for (const auto & replicas : getCluster()->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
new_query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
/// Distributed write only works in the most trivial case INSERT ... SELECT
/// without any unions or joins on the right side
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(src_storage))
{
return distributedWriteBetweenDistributedTables(*src_distributed, query, local_context);
}
if (auto src_storage_cluster = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(*src_storage_cluster, query, local_context);
}
if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. "\
"Reason: distributed reading is supported only from Distributed engine or *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
auto name_deps = getDependentViewsByColumn(local_context);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/getStructureOfRemoteTable.h>
@ -207,6 +208,9 @@ private:
void delayInsertOrThrowIfNeeded() const;
std::optional<QueryPipeline> distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context) const;
std::optional<QueryPipeline> distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr context) const;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;

View File

@ -46,11 +46,13 @@
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
@ -60,6 +62,7 @@
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sinks/EmptySink.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -74,6 +77,7 @@
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/IBackup.h>
@ -162,6 +166,7 @@ namespace ErrorCodes
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int NOT_INITIALIZED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ActionLocks
@ -4452,6 +4457,106 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context)
{
const auto & settings = local_context->getSettingsRef();
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context);
/// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function
auto src_cluster = src_storage_cluster->getCluster(local_context);
/// Actually the query doesn't change, we just serialize it to string
String query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
query.IAST::format(ast_format_settings);
query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
for (const auto & replicas : src_cluster->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
/// Do not enable parallel distributed INSERT SELECT in case when query probably comes from another server
if (local_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
return {};
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(src_distributed, query, local_context);
}
else if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. Reason: distributed "
"reading into Replicated table is supported only from *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
bool StorageReplicatedMergeTree::optimize(
const ASTPtr &,
const StorageMetadataPtr &,

View File

@ -4,6 +4,7 @@
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
@ -139,6 +140,8 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
std::optional<QueryPipeline> distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
@ -483,6 +486,8 @@ private:
std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks;
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

View File

@ -51,7 +51,7 @@ StorageS3Cluster::StorageS3Cluster(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_)
: IStorage(table_id_)
: IStorageCluster(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
@ -101,11 +101,8 @@ Pipe StorageS3Cluster::read(
{
StorageS3::updateS3Configuration(context, s3_configuration);
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String { return iterator->next(); });
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -130,7 +127,6 @@ Pipe StorageS3Cluster::read(
node.secure
);
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
@ -142,7 +138,7 @@ Pipe StorageS3Cluster::read(
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
extension);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}
@ -165,6 +161,19 @@ QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
}
ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, ContextPtr context) const
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
NamesAndTypesList StorageS3Cluster::getVirtuals() const
{
return virtual_columns;

View File

@ -10,6 +10,7 @@
#include "Client/Connection.h"
#include <Interpreters/Cluster.h>
#include <IO/S3Common.h>
#include <Storages/IStorageCluster.h>
#include <Storages/StorageS3.h>
namespace DB
@ -17,7 +18,7 @@ namespace DB
class Context;
class StorageS3Cluster : public IStorage
class StorageS3Cluster : public IStorageCluster
{
public:
StorageS3Cluster(
@ -37,9 +38,11 @@ public:
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
ClusterPtr getCluster(ContextPtr context) const override;
private:
StorageS3::S3Configuration s3_configuration;
String filename;
String cluster_name;
String format_name;

View File

@ -20,8 +20,23 @@
</shard>
</cluster_simple>
<!-- A part of the cluster above, represents only one shard-->
<first_shard>
<shard>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
</first_shard>
</remote_servers>
<macros>
<default_cluster_macro>cluster_simple</default_cluster_macro>
</macros>
</clickhouse>
</clickhouse>

View File

@ -1,5 +1,9 @@
from email.errors import HeaderParseError
import logging
import os
import csv
import shutil
import time
import pytest
from helpers.cluster import ClickHouseCluster
@ -19,6 +23,21 @@ S3_DATA = [
def create_buckets_s3(cluster):
minio = cluster.minio_client
for file_number in range(100):
file_name = f"data/generated/file_{file_number}.csv"
os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True)
S3_DATA.append(file_name)
with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f:
# a String, b UInt64
data = []
for number in range(100):
data.append([str(number) * 10, number])
writer = csv.writer(f)
writer.writerows(data)
for file in S3_DATA:
minio.fput_object(
bucket_name=cluster.minio_bucket,
@ -34,10 +53,24 @@ def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"s0_0_0", main_configs=["configs/cluster.xml"], with_minio=True
"s0_0_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "node1", "shard": "shard1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"s0_0_1",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica2", "shard": "shard1"},
with_zookeeper=True,
)
cluster.add_instance(
"s0_1_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica1", "shard": "shard2"},
with_zookeeper=True,
)
cluster.add_instance("s0_0_1", main_configs=["configs/cluster.xml"])
cluster.add_instance("s0_1_0", main_configs=["configs/cluster.xml"])
logging.info("Starting cluster...")
cluster.start()
@ -47,6 +80,7 @@ def started_cluster():
yield cluster
finally:
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"))
cluster.shutdown()
@ -55,17 +89,17 @@ def test_select_all(started_cluster):
pure_s3 = node.query(
"""
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
ORDER BY (name, value, polygon)"""
)
# print(pure_s3)
s3_distibuted = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)"""
)
# print(s3_distibuted)
@ -78,15 +112,15 @@ def test_count(started_cluster):
pure_s3 = node.query(
"""
SELECT count(*) from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(pure_s3)
s3_distibuted = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
@ -125,13 +159,13 @@ def test_union_all(started_cluster):
SELECT * FROM
(
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
@ -143,13 +177,13 @@ def test_union_all(started_cluster):
SELECT * FROM
(
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
@ -166,12 +200,12 @@ def test_wrong_cluster(started_cluster):
"""
SELECT count(*) from s3Cluster(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT count(*) from s3Cluster(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
"""
)
@ -184,14 +218,139 @@ def test_ambiguous_join(started_cluster):
result = node.query(
"""
SELECT l.name, r.value from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as l
JOIN s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as r
ON l.name = r.name
"""
)
assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_distributed_insert_select(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_second_shard = started_cluster.instances["s0_1_0"]
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_local ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_distributed ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_local ON CLUSTER 'cluster_simple' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select', '{replica}')
ORDER BY (a, b);
"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_distributed ON CLUSTER 'cluster_simple' as insert_select_local
ENGINE = Distributed('cluster_simple', default, insert_select_local, b % 2);
"""
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_distributed SETTINGS insert_distributed_sync=1 SELECT * FROM s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1, insert_distributed_sync=1;
"""
)
for line in (
first_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
second_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
first_replica_second_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 1
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_local ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_distributed ON CLUSTER 'cluster_simple';"""
)
def test_distributed_insert_select_with_replicated(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard';"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_replicated_local ON CLUSTER 'first_shard' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
ORDER BY (a, b);
"""
)
for replica in [first_replica_first_shard, second_replica_first_shard]:
replica.query(
"""
SYSTEM STOP FETCHES;
"""
)
replica.query(
"""
SYSTEM STOP MERGES;
"""
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_replicated_local SELECT * FROM s3Cluster(
'first_shard',
'http://minio1:9001/root/data/generated_replicated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1;
"""
)
for replica in [first_replica_first_shard, second_replica_first_shard]:
replica.query(
"""
SYSTEM FLUSH LOGS;
"""
)
second = int(
second_replica_first_shard.query(
"""SELECT count(*) FROM system.query_log WHERE not is_initial_query and query like '%s3Cluster%';"""
).strip()
)
assert second != 0
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard';"""
)