ClickHouse/src/Storages/StorageDistributed.cpp

2008 lines
78 KiB
C++
Raw Normal View History

2018-12-19 12:38:13 +00:00
#include <Storages/StorageDistributed.h>
#include <Databases/IDatabase.h>
#include <Disks/IDisk.h>
2021-10-15 20:18:20 +00:00
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeUUID.h>
2018-12-19 12:38:13 +00:00
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/NestedUtils.h>
2021-07-26 16:48:25 +00:00
#include <Storages/Distributed/DistributedSink.h>
#include <Storages/StorageFactory.h>
2018-12-25 23:14:39 +00:00
#include <Storages/AlterCommands.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <Storages/checkAndGetLiteralArgument.h>
2023-02-24 12:46:09 +00:00
#include <Storages/StorageDummy.h>
2023-03-10 14:36:56 +00:00
#include <Storages/removeGroupingFunctionSpecializations.h>
2020-04-10 09:24:16 +00:00
#include <Columns/ColumnConst.h>
#include <Common/Macros.h>
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Common/randomSeed.h>
#include <Common/formatReadable.h>
2018-12-19 12:38:13 +00:00
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
2018-12-19 12:38:13 +00:00
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
2018-12-19 12:38:13 +00:00
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/IAST.h>
2021-03-04 17:38:12 +00:00
#include <Analyzer/Utils.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Planner/Planner.h>
#include <Planner/Utils.h>
2018-12-19 12:38:13 +00:00
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/Cluster.h>
2023-01-24 10:46:47 +00:00
#include <Interpreters/DatabaseAndTableWithAlias.h>
2018-12-19 12:38:13 +00:00
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterDescribeQuery.h>
2018-12-19 12:38:13 +00:00
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/Context.h>
2018-12-19 12:38:13 +00:00
#include <Interpreters/createBlockSelector.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/getClusterName.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
2023-01-19 10:26:38 +00:00
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/IStorageCluster.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
2021-09-08 18:29:38 +00:00
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Sources/NullSource.h>
2021-07-14 13:17:30 +00:00
#include <Processors/Sources/RemoteSource.h>
2021-07-23 14:25:35 +00:00
#include <Processors/Sinks/EmptySink.h>
#include <Core/Settings.h>
2023-01-24 10:46:47 +00:00
#include <Core/SettingsEnums.h>
2012-05-21 20:38:34 +00:00
2018-06-05 19:46:49 +00:00
#include <IO/ReadHelpers.h>
2020-11-09 19:07:38 +00:00
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
Fix terribly broken, fragile and potentially cyclic linking Sorry for the clickbaity title. This is about static method ConnectionTimeouts::getHTTPTimeouts(). It was be declared in header IO/ConnectionTimeouts.h, and defined in header IO/ConnectionTimeoutsContext.h (!). This is weird and caused issues with linking on s390x (##45520). There was an attempt to fix some inconsistencies (#45848) but neither did @Algunenano nor me at first really understand why the definition is in the header. Turns out that ConnectionTimeoutsContext.h is only #include'd from source files which are part of the normal server build BUT NOT part of the keeper standalone build (which must be enabled via CMake -DBUILD_STANDALONE_KEEPER=1). This dependency was not documented and as a result, some misguided workarounds were introduced earlier, e.g. https://github.com/ClickHouse/ClickHouse/pull/38475/commits/0341c6c54bd7ac77200b4ca123208b195514ef20 The deeper cause was that getHTTPTimeouts() is passed a "Context". This class is part of the "dbms" libary which is deliberately not linked by the standalone build of clickhouse-keeper. The context is only used to read the settings and the "Settings" class is part of the clickhouse_common library which is linked by clickhouse-keeper already. To resolve this mess, this PR - creates source file IO/ConnectionTimeouts.cpp and moves all ConnectionTimeouts definitions into it, including getHTTPTimeouts(). - breaks the wrong dependency by passing "Settings" instead of "Context" into getHTTPTimeouts(). - resolves the previous hacks
2023-02-03 10:54:49 +00:00
#include <IO/ConnectionTimeouts.h>
2018-06-05 19:46:49 +00:00
2015-02-10 21:10:58 +00:00
#include <memory>
#include <filesystem>
#include <optional>
2020-09-18 19:25:56 +00:00
#include <cassert>
2016-12-12 03:33:34 +00:00
2021-04-27 00:05:43 +00:00
namespace fs = std::filesystem;
namespace
{
2020-03-09 01:03:43 +00:00
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY = 1;
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2;
const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2;
const UInt64 PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL = 2;
}
namespace ProfileEvents
{
extern const Event DistributedRejectedInserts;
extern const Event DistributedDelayedInserts;
extern const Event DistributedDelayedInsertsMilliseconds;
}
2012-05-21 20:38:34 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED;
extern const int STORAGE_REQUIRES_PARAMETER;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int INFINITE_LOOP;
extern const int ILLEGAL_FINAL;
2018-06-05 19:46:49 +00:00
extern const int TYPE_MISMATCH;
2018-12-19 12:38:13 +00:00
extern const int TOO_MANY_ROWS;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
extern const int INVALID_SHARD_ID;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Return <remote> instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-08 14:24:39 +00:00
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED;
}
namespace ActionLocks
{
2019-04-22 15:11:16 +00:00
extern const StorageActionBlockType DistributedSend;
}
2014-08-21 12:07:29 +00:00
namespace
{
/// Calculate maximum number in file names in directory and all subdirectories.
/// To ensure global order of data blocks yet to be sent across server restarts.
UInt64 getMaximumFileNumber(const std::string & dir_path)
{
UInt64 res = 0;
std::filesystem::recursive_directory_iterator begin(dir_path);
std::filesystem::recursive_directory_iterator end;
for (auto it = begin; it != end; ++it)
{
const auto & file_path = it->path();
if (!std::filesystem::is_regular_file(*it) || !endsWith(file_path.filename().string(), ".bin"))
continue;
UInt64 num = 0;
try
{
num = parse<UInt64>(file_path.filename().stem().string());
}
catch (Exception & e)
{
e.addMessage("Unexpected file name " + file_path.filename().string() + " found at " + file_path.parent_path().string() + ", should have numeric base name.");
throw;
}
if (num > res)
res = num;
}
return res;
}
std::string makeFormattedListOfShards(const ClusterPtr & cluster)
{
2020-11-09 19:07:38 +00:00
WriteBufferFromOwnString buf;
bool head = true;
2020-11-09 19:07:38 +00:00
buf << "[";
for (const auto & shard_info : cluster->getShardsInfo())
{
2020-11-09 19:07:38 +00:00
(head ? buf : buf << ", ") << shard_info.shard_num;
head = false;
}
2020-11-09 19:07:38 +00:00
buf << "]";
2020-11-09 19:07:38 +00:00
return buf.str();
}
ExpressionActionsPtr buildShardingKeyExpression(const ASTPtr & sharding_key, ContextPtr context, const NamesAndTypesList & columns, bool project)
{
ASTPtr query = sharding_key;
auto syntax_result = TreeRewriter(context).analyze(query, columns);
return ExpressionAnalyzer(query, syntax_result, context).getActions(project);
}
2021-06-28 17:02:22 +00:00
bool isExpressionActionsDeterministic(const ExpressionActionsPtr & actions)
{
for (const auto & action : actions->getActions())
{
2020-11-10 14:54:59 +00:00
if (action.node->type != ActionsDAG::ActionType::FUNCTION)
continue;
2020-11-03 11:28:28 +00:00
if (!action.node->function_base->isDeterministic())
return false;
}
return true;
}
class ReplacingConstantExpressionsMatcher
{
public:
using Data = Block;
static bool needChildVisit(ASTPtr &, const ASTPtr &)
{
return true;
}
static void visit(ASTPtr & node, Block & block_with_constants)
{
if (!node->as<ASTFunction>())
return;
std::string name = node->getColumnName();
if (block_with_constants.has(name))
{
auto result = block_with_constants.getByName(name);
if (!isColumnConst(*result.column))
return;
2020-03-23 17:28:38 +00:00
node = std::make_shared<ASTLiteral>(assert_cast<const ColumnConst &>(*result.column).getField());
}
}
};
2020-03-23 17:28:38 +00:00
2020-06-18 09:08:24 +00:00
void replaceConstantExpressions(
ASTPtr & node,
ContextPtr context,
2020-06-18 09:08:24 +00:00
const NamesAndTypesList & columns,
ConstStoragePtr storage,
const StorageSnapshotPtr & storage_snapshot)
{
auto syntax_result = TreeRewriter(context).analyze(node, columns, storage, storage_snapshot);
Block block_with_constants = KeyCondition::getBlockWithConstants(node, syntax_result, context);
InDepthNodeVisitor<ReplacingConstantExpressionsMatcher, true> visitor(block_with_constants);
visitor.visit(node);
}
size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster)
{
size_t num_local_shards = cluster->getLocalShardCount();
size_t num_remote_shards = cluster->getRemoteShardCount();
return (num_remote_shards + num_local_shards) * settings.max_parallel_replicas;
}
2020-03-26 18:25:26 +00:00
}
/// For destruction of std::unique_ptr of type that is incomplete in class definition.
StorageDistributed::~StorageDistributed() = default;
NamesAndTypesList StorageDistributed::getVirtuals() const
2020-04-27 13:55:30 +00:00
{
/// NOTE This is weird. Most of these virtual columns are part of MergeTree
/// tables info. But Distributed is general-purpose engine.
return NamesAndTypesList{
2023-02-02 16:33:31 +00:00
NameAndTypePair("_table", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())),
NameAndTypePair("_part", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())),
2022-02-05 08:10:28 +00:00
NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>()),
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())),
2022-02-05 08:10:28 +00:00
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
NameAndTypePair("_part_offset", std::make_shared<DataTypeUInt64>()),
2022-07-18 07:36:28 +00:00
NameAndTypePair("_row_exists", std::make_shared<DataTypeUInt8>()),
2022-02-05 08:10:28 +00:00
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()), /// deprecated
2020-04-27 13:55:30 +00:00
};
}
StorageDistributed::StorageDistributed(
2019-12-04 16:06:55 +00:00
const StorageID & id_,
const ColumnsDescription & columns_,
2019-08-24 21:20:20 +00:00
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
ContextPtr context_,
const ASTPtr & sharding_key_,
2020-07-23 14:10:48 +00:00
const String & storage_policy_name_,
2019-10-25 19:07:47 +00:00
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
2021-08-20 14:05:53 +00:00
ClusterPtr owned_cluster_,
ASTPtr remote_table_function_ptr_)
2020-04-27 13:55:30 +00:00
: IStorage(id_)
, WithContext(context_->getGlobalContext())
2019-12-04 16:06:55 +00:00
, remote_database(remote_database_)
, remote_table(remote_table_)
2021-08-20 14:05:53 +00:00
, remote_table_function_ptr(remote_table_function_ptr_)
2020-05-30 21:57:37 +00:00
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
, owned_cluster(std::move(owned_cluster_))
, cluster_name(getContext()->getMacros()->expand(cluster_name_))
2019-12-04 16:06:55 +00:00
, has_sharding_key(sharding_key_)
, relative_data_path(relative_data_path_)
, distributed_settings(distributed_settings_)
, rng(randomSeed())
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
StorageID id = StorageID::createEmpty();
id.table_name = remote_table;
id.database_name = remote_database;
storage_metadata.setColumns(getStructureOfRemoteTable(*getCluster(), id, getContext(), remote_table_function_ptr));
}
else
storage_metadata.setColumns(columns_);
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2019-08-24 21:20:20 +00:00
2019-08-26 13:46:07 +00:00
if (sharding_key_)
{
sharding_key_expr = buildShardingKeyExpression(sharding_key_, getContext(), storage_metadata.getColumns().getAllPhysical(), false);
2019-08-26 13:46:07 +00:00
sharding_key_column_name = sharding_key_->getColumnName();
2021-06-28 17:02:22 +00:00
sharding_key_is_deterministic = isExpressionActionsDeterministic(sharding_key_expr);
2019-08-26 13:46:07 +00:00
}
if (!relative_data_path.empty())
2020-07-23 14:10:48 +00:00
{
storage_policy = getContext()->getStoragePolicy(storage_policy_name_);
data_volume = storage_policy->getVolume(0);
if (storage_policy->getVolumes().size() > 1)
LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. "
"Only {} volume will be used to store data. Other will be ignored.", data_volume->getName());
2020-07-23 14:10:48 +00:00
}
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_)
{
if (remote_database.empty() && !remote_table_function_ptr && !getCluster()->maybeCrossReplication())
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
size_t num_local_shards = getCluster()->getLocalShardCount();
if (num_local_shards && (remote_database.empty() || remote_database == id_.database_name) && remote_table == id_.table_name)
throw Exception(ErrorCodes::INFINITE_LOOP, "Distributed table {} looks at itself", id_.table_name);
}
initializeFromDisk();
}
StorageDistributed::StorageDistributed(
2019-12-04 16:06:55 +00:00
const StorageID & id_,
const ColumnsDescription & columns_,
2019-08-24 21:20:20 +00:00
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
ContextPtr context_,
const ASTPtr & sharding_key_,
2020-07-23 14:10:48 +00:00
const String & storage_policy_name_,
2019-10-25 19:07:47 +00:00
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach,
ClusterPtr owned_cluster_)
2021-04-23 12:18:23 +00:00
: StorageDistributed(
id_,
columns_,
constraints_,
String{},
String{},
String{},
cluster_name_,
context_,
sharding_key_,
storage_policy_name_,
relative_data_path_,
distributed_settings_,
attach,
2021-08-20 14:05:53 +00:00
std::move(owned_cluster_),
remote_table_function_ptr_)
{
}
2020-11-07 21:30:40 +00:00
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info) const
{
const auto & settings = local_context->getSettingsRef();
ClusterPtr cluster = getCluster();
size_t nodes = getClusterQueriedNodes(settings, cluster);
2023-03-03 15:14:49 +00:00
if (query_info.use_custom_key)
{
2023-01-19 11:28:26 +00:00
LOG_INFO(log, "Single shard cluster used with custom_key, transforming replicas into virtual shards");
query_info.cluster = cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
}
else
{
query_info.cluster = cluster;
2023-01-19 11:28:26 +00:00
if (nodes > 1 && settings.optimize_skip_unused_shards)
{
2023-01-19 11:28:26 +00:00
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, storage_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
query_info.optimized_cluster = cluster;
nodes = getClusterQueriedNodes(settings, cluster);
}
else
{
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}",
has_sharding_key ? "" : " (no sharding key)");
}
}
}
if (settings.distributed_group_by_no_merge)
{
if (settings.distributed_group_by_no_merge == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION)
{
if (settings.distributed_push_down_limit)
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
else
return QueryProcessingStage::WithMergeableStateAfterAggregation;
}
else
{
/// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit
2021-06-28 17:02:22 +00:00
/// (since in this case queries processed separately and the initiator is just a proxy in this case).
if (to_stage != QueryProcessingStage::Complete)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Queries with distributed_group_by_no_merge=1 should be processed to Complete stage");
return QueryProcessingStage::Complete;
}
}
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::WithMergeableState;
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if (nodes == 1)
{
/// In case the query was processed to
/// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit
/// (which are greater the Complete stage)
/// we cannot return Complete (will break aliases and similar),
/// relevant for Distributed over Distributed
return std::max(to_stage, QueryProcessingStage::Complete);
}
else if (nodes == 0)
{
/// In case of 0 shards, the query should be processed fully on the initiator,
/// since we need to apply aggregations.
/// That's why we need to return FetchColumns.
return QueryProcessingStage::FetchColumns;
}
auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings);
if (optimized_stage)
{
if (*optimized_stage == QueryProcessingStage::Complete)
return std::min(to_stage, *optimized_stage);
return *optimized_stage;
}
return QueryProcessingStage::WithMergeableState;
}
std::optional<QueryProcessingStage::Enum> StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const
{
bool optimize_sharding_key_aggregation =
settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key &&
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic);
QueryProcessingStage::Enum default_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
if (settings.distributed_push_down_limit)
default_stage = QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto expr_contains_sharding_key = [&](const auto & exprs) -> bool
{
std::unordered_set<std::string> expr_columns;
for (auto & expr : exprs)
{
auto id = expr->template as<ASTIdentifier>();
if (!id)
continue;
expr_columns.emplace(id->name());
}
for (const auto & column : sharding_key_expr->getRequiredColumns())
{
if (!expr_columns.contains(column))
return false;
}
return true;
};
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return {};
// Window functions are not supported.
if (query_info.has_window)
return {};
// TODO: extremes support can be implemented
if (settings.extremes)
return {};
// DISTINCT
if (select.distinct)
{
if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(select.select()->children))
return {};
}
// GROUP BY
const ASTPtr group_by = select.groupBy();
bool has_aggregates = query_info.has_aggregates;
if (query_info.syntax_analyzer_result)
2023-01-13 16:53:53 +00:00
has_aggregates = !query_info.syntax_analyzer_result->aggregates.empty();
2023-01-13 16:53:53 +00:00
if (has_aggregates || group_by)
{
if (!optimize_sharding_key_aggregation || !group_by || !expr_contains_sharding_key(group_by->children))
return {};
}
// LIMIT BY
if (const ASTPtr limit_by = select.limitBy())
{
if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(limit_by->children))
return {};
}
// ORDER BY
if (const ASTPtr order_by = select.orderBy())
return default_stage;
// LIMIT
// OFFSET
if (select.limitLength() || select.limitOffset())
return default_stage;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
2020-03-18 00:57:00 +00:00
}
static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr query)
{
2022-05-06 14:44:00 +00:00
if (!hasDynamicSubcolumns(all_columns))
return false;
if (!query)
return true;
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(query);
auto required_columns = columns_context.requiredColumns();
for (const auto & required_column : required_columns)
{
auto name_in_storage = Nested::splitName(required_column).first;
auto column_in_storage = all_columns.tryGetPhysical(name_in_storage);
2022-05-06 14:44:00 +00:00
if (column_in_storage && column_in_storage->type->hasDynamicSubcolumns())
return true;
}
return false;
}
StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const
{
return getStorageSnapshotForQuery(metadata_snapshot, nullptr, query_context);
}
StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery(
const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr /*query_context*/) const
{
/// If query doesn't use columns of type Object, don't deduce
2022-03-01 16:32:55 +00:00
/// concrete types for them, because it required extra round trip.
2021-07-24 00:55:50 +00:00
auto snapshot_data = std::make_unique<SnapshotData>();
if (!requiresObjectColumns(metadata_snapshot->getColumns(), query))
2021-07-24 00:55:50 +00:00
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, ColumnsDescription{}, std::move(snapshot_data));
snapshot_data->objects_by_shard = getExtendedObjectsOfRemoteTables(
*getCluster(),
StorageID{remote_database, remote_table},
metadata_snapshot->getColumns(),
getContext());
2022-05-06 14:44:00 +00:00
auto object_columns = DB::getConcreteObjectColumns(
snapshot_data->objects_by_shard.begin(),
snapshot_data->objects_by_shard.end(),
metadata_snapshot->getColumns(),
2022-03-01 17:20:53 +00:00
[](const auto & shard_num_and_columns) -> const auto & { return shard_num_and_columns.second; });
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns, std::move(snapshot_data));
}
2023-01-10 11:52:29 +00:00
namespace
{
/// Visitor that collect column source to columns mapping from query and all subqueries
class CollectColumnSourceToColumnsVisitor : public InDepthQueryTreeVisitor<CollectColumnSourceToColumnsVisitor>
{
public:
struct Columns
{
NameSet column_names;
NamesAndTypes columns;
void addColumn(NameAndTypePair column)
{
if (column_names.contains(column.name))
return;
column_names.insert(column.name);
columns.push_back(std::move(column));
}
};
const std::unordered_map<QueryTreeNodePtr, Columns> & getColumnSourceToColumns() const
{
return column_source_to_columns;
}
void visitImpl(QueryTreeNodePtr & node)
{
auto * column_node = node->as<ColumnNode>();
if (!column_node)
return;
auto column_source = column_node->getColumnSourceOrNull();
if (!column_source)
return;
auto it = column_source_to_columns.find(column_source);
if (it == column_source_to_columns.end())
{
auto [insert_it, _] = column_source_to_columns.emplace(column_source, Columns());
it = insert_it;
}
it->second.addColumn(column_node->getColumn());
}
private:
std::unordered_map<QueryTreeNodePtr, Columns> column_source_to_columns;
};
/** Visitor that rewrites IN and JOINs in query and all subqueries according to distributed_product_mode and
* prefer_global_in_and_join settings.
*
* Additionally collects GLOBAL JOIN and GLOBAL IN query nodes.
*
* If distributed_product_mode = deny, then visitor throws exception if there are multiple distributed tables.
* If distributed_product_mode = local, then visitor collects replacement map for tables that must be replaced
* with local tables.
* If distributed_product_mode = global or prefer_global_in_and_join setting is true, then visitor rewrites JOINs and IN functions that
* contain distributed tables to GLOBAL JOINs and GLOBAL IN functions.
* If distributed_product_mode = allow, then visitor does not rewrite query if there are multiple distributed tables.
*/
class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>;
using Base::Base;
explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_)
: Base(context_)
{}
struct InFunctionOrJoin
{
QueryTreeNodePtr query_node;
size_t subquery_depth = 0;
};
const std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> & getReplacementMap() const
{
return replacement_map;
}
const std::vector<InFunctionOrJoin> & getGlobalInOrJoinNodes() const
{
return global_in_or_join_nodes;
}
2023-03-04 17:46:40 +00:00
static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr & child)
{
auto * function_node = parent->as<FunctionNode>();
if (function_node && isNameOfGlobalInFunction(function_node->getFunctionName()))
return false;
auto * join_node = parent->as<JoinNode>();
if (join_node && join_node->getLocality() == JoinLocality::Global && join_node->getRightTableExpression() == child)
return false;
return true;
}
void visitImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
auto * join_node = node->as<JoinNode>();
if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() == JoinLocality::Global))
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
global_in_or_join_nodes.push_back(std::move(in_function_or_join_entry));
return;
}
if ((function_node && isNameOfLocalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() != JoinLocality::Global))
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
in_function_or_join_stack.push_back(in_function_or_join_entry);
return;
}
if (node->getNodeType() == QueryTreeNodeType::TABLE)
tryRewriteTableNodeIfNeeded(node);
}
void leaveImpl(QueryTreeNodePtr & node)
{
if (!in_function_or_join_stack.empty() && node.get() == in_function_or_join_stack.back().query_node.get())
in_function_or_join_stack.pop_back();
}
private:
void tryRewriteTableNodeIfNeeded(const QueryTreeNodePtr & table_node)
{
const auto & table_node_typed = table_node->as<TableNode &>();
const auto * distributed_storage = typeid_cast<const StorageDistributed *>(table_node_typed.getStorage().get());
if (!distributed_storage)
return;
bool distributed_valid_for_rewrite = distributed_storage->getShardCount() >= 2;
if (!distributed_valid_for_rewrite)
return;
auto distributed_product_mode = getSettings().distributed_product_mode;
if (distributed_product_mode == DistributedProductMode::LOCAL)
{
StorageID remote_storage_id = StorageID{distributed_storage->getRemoteDatabaseName(),
distributed_storage->getRemoteTableName()};
auto resolved_remote_storage_id = getContext()->resolveStorageID(remote_storage_id);
const auto & distributed_storage_columns = table_node_typed.getStorageSnapshot()->metadata->getColumns();
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_columns);
auto replacement_table_expression = std::make_shared<TableNode>(std::move(storage), getContext());
replacement_map.emplace(table_node.get(), std::move(replacement_table_expression));
}
else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings().prefer_global_in_and_join) &&
!in_function_or_join_stack.empty())
{
auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get();
if (auto * in_function_to_modify = in_or_join_node_to_modify->as<FunctionNode>())
{
auto global_in_function_name = getGlobalInFunctionNameForLocalInFunctionName(in_function_to_modify->getFunctionName());
auto global_in_function_resolver = FunctionFactory::instance().get(global_in_function_name, getContext());
in_function_to_modify->resolveAsFunction(global_in_function_resolver->build(in_function_to_modify->getArgumentColumns()));
}
else if (auto * join_node_to_modify = in_or_join_node_to_modify->as<JoinNode>())
{
join_node_to_modify->setLocality(JoinLocality::Global);
}
global_in_or_join_nodes.push_back(in_function_or_join_stack.back());
}
else if (distributed_product_mode == DistributedProductMode::ALLOW)
{
return;
}
else if (distributed_product_mode == DistributedProductMode::DENY)
{
throw Exception(ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED,
"Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny'). "
"You may rewrite query to use local tables "
"in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.");
}
}
std::vector<InFunctionOrJoin> in_function_or_join_stack;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
std::vector<InFunctionOrJoin> global_in_or_join_nodes;
};
/** Execute subquery node and put result in mutable context temporary table.
* Returns table node that is initialized with temporary table storage.
*/
QueryTreeNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
ContextMutablePtr & mutable_context,
size_t subquery_depth)
{
auto subquery_hash = subquery_node->getTreeHash();
String temporary_table_name = fmt::format("_data_{}_{}", subquery_hash.first, subquery_hash.second);
const auto & external_tables = mutable_context->getExternalTables();
auto external_table_it = external_tables.find(temporary_table_name);
if (external_table_it != external_tables.end())
{
auto temporary_table_expression_node = std::make_shared<TableNode>(external_table_it->second, mutable_context);
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
return temporary_table_expression_node;
}
auto subquery_options = SelectQueryOptions(QueryProcessingStage::Complete, subquery_depth, true /*is_subquery*/);
auto context_copy = Context::createCopy(mutable_context);
updateContextForSubqueryExecution(context_copy);
2023-03-04 17:46:40 +00:00
InterpreterSelectQueryAnalyzer interpreter(subquery_node, context_copy, subquery_options);
auto & query_plan = interpreter.getQueryPlan();
auto sample_block_with_unique_names = query_plan.getCurrentDataStream().header;
makeUniqueColumnNamesInBlock(sample_block_with_unique_names);
if (!blocksHaveEqualStructure(sample_block_with_unique_names, query_plan.getCurrentDataStream().header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
sample_block_with_unique_names.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(actions_dag));
query_plan.addStep(std::move(converting_step));
}
Block sample = interpreter.getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
auto external_storage_holder = TemporaryTableHolder(
mutable_context,
ColumnsDescription{columns},
ConstraintsDescription{},
nullptr /*query*/,
true /*create_for_global_subquery*/);
StoragePtr external_storage = external_storage_holder.getTable();
auto temporary_table_expression_node = std::make_shared<TableNode>(external_storage, mutable_context);
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
auto table_out = external_storage->write({}, external_storage->getInMemoryMetadataPtr(), mutable_context);
auto io = interpreter.execute();
io.pipeline.complete(std::move(table_out));
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
mutable_context->addExternalTable(temporary_table_name, std::move(external_storage_holder));
return temporary_table_expression_node;
}
QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
const StorageSnapshotPtr & distributed_storage_snapshot,
const StorageID & remote_storage_id,
const ASTPtr & remote_table_function)
{
auto & planner_context = query_info.planner_context;
const auto & query_context = planner_context->getQueryContext();
std::optional<TableExpressionModifiers> table_expression_modifiers;
if (auto * query_info_table_node = query_info.table_expression->as<TableNode>())
table_expression_modifiers = query_info_table_node->getTableExpressionModifiers();
else if (auto * query_info_table_function_node = query_info.table_expression->as<TableFunctionNode>())
table_expression_modifiers = query_info_table_function_node->getTableExpressionModifiers();
QueryTreeNodePtr replacement_table_expression;
if (remote_table_function)
{
auto remote_table_function_query_tree = buildQueryTree(remote_table_function, query_context);
auto & remote_table_function_node = remote_table_function_query_tree->as<FunctionNode &>();
auto table_function_node = std::make_shared<TableFunctionNode>(remote_table_function_node.getFunctionName());
table_function_node->getArgumentsNode() = remote_table_function_node.getArgumentsNode();
if (table_expression_modifiers)
table_function_node->setTableExpressionModifiers(*table_expression_modifiers);
QueryAnalysisPass query_analysis_pass;
query_analysis_pass.run(table_function_node, query_context);
replacement_table_expression = std::move(table_function_node);
}
else
{
auto resolved_remote_storage_id = query_context->resolveStorageID(remote_storage_id);
2023-02-25 19:16:51 +00:00
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_snapshot->metadata->getColumns());
auto table_node = std::make_shared<TableNode>(std::move(storage), query_context);
if (table_expression_modifiers)
table_node->setTableExpressionModifiers(*table_expression_modifiers);
replacement_table_expression = std::move(table_node);
}
replacement_table_expression->setAlias(query_info.table_expression->getAlias());
auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression));
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);
const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns();
DistributedProductModeRewriteInJoinVisitor visitor(query_info.planner_context->getQueryContext());
visitor.visit(query_tree_to_modify);
auto replacement_map = visitor.getReplacementMap();
const auto & global_in_or_join_nodes = visitor.getGlobalInOrJoinNodes();
for (const auto & global_in_or_join_node : global_in_or_join_nodes)
{
if (auto * join_node = global_in_or_join_node.query_node->as<JoinNode>())
{
auto join_right_table_expression = join_node->getRightTableExpression();
auto join_right_table_expression_node_type = join_right_table_expression->getNodeType();
QueryTreeNodePtr subquery_node;
if (join_right_table_expression_node_type == QueryTreeNodeType::QUERY ||
join_right_table_expression_node_type == QueryTreeNodeType::UNION)
{
subquery_node = join_right_table_expression;
}
else if (join_right_table_expression_node_type == QueryTreeNodeType::TABLE ||
join_right_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
{
const auto & columns = column_source_to_columns.at(join_right_table_expression).columns;
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns,
join_right_table_expression,
planner_context->getQueryContext());
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
2023-03-03 18:03:40 +00:00
"Expected JOIN right table expression to be table, table function, query or union node. Actual {}",
join_right_table_expression->formatASTForErrorMessage());
}
auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_right_table_expression->getAlias());
replacement_map.emplace(join_right_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
else if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())
{
auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1);
2023-03-04 17:46:40 +00:00
auto in_function_node_type = in_function_subquery_node->getNodeType();
if (in_function_node_type != QueryTreeNodeType::QUERY && in_function_node_type != QueryTreeNodeType::UNION)
continue;
auto temporary_table_expression_node = executeSubqueryNode(in_function_subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
in_function_subquery_node = std::move(temporary_table_expression_node);
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected global IN or JOIN query node. Actual {}",
global_in_or_join_node.query_node->formatASTForErrorMessage());
}
}
if (!replacement_map.empty())
query_tree_to_modify = query_tree_to_modify->cloneAndReplace(replacement_map);
2023-03-14 12:04:39 +00:00
removeGroupingFunctionSpecializations(query_tree_to_modify);
return query_tree_to_modify;
}
2023-01-10 11:52:29 +00:00
}
2020-09-25 13:19:26 +00:00
void StorageDistributed::read(
QueryPlan & query_plan,
const Names &,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
const auto * select_query = query_info.query->as<ASTSelectQuery>();
if (select_query->final() && local_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas)
throw Exception(ErrorCodes::ILLEGAL_FINAL, "Final modifier is not allowed together with parallel reading from replicas feature");
Block header;
ASTPtr query_ast;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
StorageID remote_storage_id = StorageID::createEmpty();
if (!remote_table_function_ptr)
remote_storage_id = StorageID{remote_database, remote_table};
auto query_tree_distributed = buildQueryTreeDistributed(query_info,
storage_snapshot,
remote_storage_id,
remote_table_function_ptr);
query_ast = queryNodeToSelectQuery(query_tree_distributed);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, local_context, SelectQueryOptions(processed_stage).analyze());
}
else
{
header =
InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
query_ast = query_info.query;
}
2023-02-03 13:34:18 +00:00
const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery(
2023-02-05 12:20:10 +00:00
local_context, query_ast,
remote_database, remote_table, remote_table_function_ptr);
/// Return directly (with correct header) if no shard to query.
if (query_info.getCluster()->getShardsInfo().empty())
{
Pipe pipe(std::make_shared<NullSource>(header));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
read_from_pipe->setStepDescription("Read from NullSource (Distributed)");
query_plan.addStep(std::move(read_from_pipe));
return;
}
2021-07-15 16:15:16 +00:00
StorageID main_table = StorageID::createEmpty();
if (!remote_table_function_ptr)
main_table = StorageID{remote_database, remote_table};
const auto & snapshot_data = assert_cast<const SnapshotData &>(*storage_snapshot->data);
2021-07-15 16:15:16 +00:00
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
snapshot_data.objects_by_shard,
storage_snapshot,
processed_stage);
2015-11-06 17:44:01 +00:00
auto settings = local_context->getSettingsRef();
2023-01-19 08:13:59 +00:00
ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator;
2023-03-03 15:14:49 +00:00
if (query_info.use_custom_key)
{
2023-03-03 15:14:49 +00:00
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context))
{
2023-01-24 10:46:47 +00:00
if (query_info.getCluster()->getShardCount() == 1)
{
// we are reading from single shard with multiple replicas but didn't transform replicas
// into virtual shards with custom_key set
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas weren't transformed into virtual shards");
}
additional_shard_filter_generator =
[&, custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
shard_count, shard_num - 1, custom_key_ast, settings.parallel_replicas_custom_key_filter_type, *this, local_context);
};
}
}
2023-02-03 13:34:18 +00:00
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster, additional_shard_filter_generator);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized");
2012-05-21 20:38:34 +00:00
}
2021-07-23 14:25:35 +00:00
SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
auto cluster = getCluster();
const auto & settings = local_context->getSettingsRef();
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync && !settings.insert_shard_id)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage {} must have own data directory to enable asynchronous inserts",
getName());
}
auto shard_num = cluster->getLocalShardCount() + cluster->getRemoteShardCount();
/// If sharding key is not specified, then you can only write to a shard containing only one shard
if (!settings.insert_shard_id && !settings.insert_distributed_one_random_shard && !has_sharding_key && shard_num >= 2)
{
throw Exception(ErrorCodes::STORAGE_REQUIRES_PARAMETER,
"Method write is not supported by storage {} with more than one shard and no sharding key provided", getName());
}
if (settings.insert_shard_id && settings.insert_shard_id > shard_num)
{
throw Exception(ErrorCodes::INVALID_SHARD_ID, "Shard id should be range from 1 to shard number");
}
2014-08-21 12:07:29 +00:00
/// Force sync insertion if it is remote() table function
bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster;
auto timeout = settings.insert_distributed_timeout;
Names columns_to_send;
if (settings.insert_allow_materialized_columns)
columns_to_send = metadata_snapshot->getSampleBlock().getNames();
else
columns_to_send = metadata_snapshot->getSampleBlockNonMaterialized().getNames();
2022-05-09 19:13:02 +00:00
/// DistributedSink will not own cluster, but will own ConnectionPools of the cluster
2021-07-23 14:25:35 +00:00
return std::make_shared<DistributedSink>(
local_context, *this, metadata_snapshot, cluster, insert_sync, timeout,
StorageID{remote_database, remote_table}, columns_to_send);
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
/// Unwrap view() function.
if (src_distributed.remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(src_distributed.remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto * select = query.select->as<ASTSelectWithUnionQuery &>().list_of_selects->children.at(0)->as<ASTSelectQuery>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(src_distributed.getRemoteDatabaseName(), src_distributed.getRemoteTableName());
new_query->select = select_with_union_query;
}
const Cluster::AddressesWithFailover & src_addresses = src_distributed.getCluster()->getShardsAddresses();
Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Return <remote> instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-08 14:24:39 +00:00
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
/// Compare addresses instead of cluster name, to handle remote()/cluster().
/// (since for remote()/cluster() the getClusterName() is empty string)
if (src_addresses != dst_addresses)
{
Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Return <remote> instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-08 14:24:39 +00:00
/// The warning should be produced only for root queries,
/// since in case of parallel_distributed_insert_select=1,
/// it will produce warning for the rewritten insert,
/// since destination table is still Distributed there.
if (local_context->getClientInfo().distributed_depth == 0)
{
LOG_WARNING(log,
"Parallel distributed INSERT SELECT is not possible "
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
src_distributed.getClusterName(),
Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Return <remote> instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-08 14:24:39 +00:00
src_addresses.size(),
getClusterName(),
dst_addresses.size());
}
2022-05-20 19:49:31 +00:00
return {};
}
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Return <remote> instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-08 14:24:39 +00:00
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
const auto & cluster = getCluster();
const auto & shards_info = cluster->getShardsInfo();
String new_query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
new_query->IAST::format(ast_format_settings);
new_query_str = buf.str();
}
QueryPipeline pipeline;
Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Return <remote> instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-08 14:24:39 +00:00
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
2021-06-15 19:55:21 +00:00
for (size_t shard_index : collections::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_index];
if (shard_info.isLocal())
{
Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Return <remote> instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-08 14:24:39 +00:00
InterpreterInsertQuery interpreter(new_query, query_context);
2022-05-20 19:49:31 +00:00
pipeline.addCompletedPipeline(interpreter.execute().pipeline);
}
else
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
if (connections.empty() || connections.front().isNull())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected exactly one connection for shard {}",
shard_info.shard_num);
/// INSERT SELECT query returns empty block
2021-07-14 13:17:30 +00:00
auto remote_query_executor
= std::make_shared<RemoteQueryExecutor>(std::move(connections), new_query_str, Block{}, query_context);
2022-05-20 19:49:31 +00:00
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
2022-05-20 19:49:31 +00:00
return pipeline;
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
/// Select query is needed for pruining on virtual columns
auto extension = src_storage_cluster.getTaskIteratorExtension(
select.list_of_selects->children.at(0)->as<ASTSelectQuery>()->clone(),
local_context);
auto dst_cluster = getCluster();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
String new_query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
new_query->IAST::format(ast_format_settings);
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
/// Here we take addresses from destination cluster and assume source table exists on these nodes
for (const auto & replicas : getCluster()->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
new_query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception(ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH, "Maximum distributed depth exceeded");
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
/// Distributed write only works in the most trivial case INSERT ... SELECT
/// without any unions or joins on the right side
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(src_storage))
{
return distributedWriteBetweenDistributedTables(*src_distributed, query, local_context);
}
if (auto src_storage_cluster = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(*src_storage_cluster, query, local_context);
}
if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. "\
"Reason: distributed reading is supported only from Distributed engine "
"or *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
2013-09-23 12:01:19 +00:00
{
auto name_deps = getDependentViewsByColumn(local_context);
2019-12-26 18:17:05 +00:00
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN
&& command.type != AlterCommand::Type::RENAME_COLUMN && command.type != AlterCommand::Type::COMMENT_TABLE)
2016-05-13 21:08:19 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}",
command.type, getName());
if (command.type == AlterCommand::DROP_COLUMN && !command.clear)
{
2021-02-28 07:42:08 +00:00
const auto & deps_mv = name_deps[command.column_name];
if (!deps_mv.empty())
{
2023-01-17 00:19:44 +00:00
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,
"Trying to ALTER DROP column {} which is referenced by materialized view {}",
backQuoteIfNeed(command.column_name), toString(deps_mv));
}
}
2019-12-26 18:17:05 +00:00
}
}
2021-10-25 17:49:49 +00:00
void StorageDistributed::alter(const AlterCommands & params, ContextPtr local_context, AlterLockHolder &)
2013-09-23 12:01:19 +00:00
{
auto table_id = getStorageID();
checkAlterIsPossible(params, local_context);
2020-06-09 17:28:29 +00:00
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
params.apply(new_metadata, local_context);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);
2013-09-23 12:01:19 +00:00
}
void StorageDistributed::initializeFromDisk()
{
2020-07-23 14:10:48 +00:00
if (!storage_policy)
return;
const auto & disks = data_volume->getDisks();
2021-06-27 15:22:34 +00:00
/// Make initialization for large number of disks parallel.
ThreadPool pool(disks.size());
for (const DiskPtr & disk : disks)
{
pool.scheduleOrThrowOnError([&]()
{
initializeDirectoryQueuesForDisk(disk);
});
}
pool.wait();
const auto & paths = getDataPaths();
std::vector<UInt64> last_increment(paths.size());
for (size_t i = 0; i < paths.size(); ++i)
{
pool.scheduleOrThrowOnError([&, i]()
{
last_increment[i] = getMaximumFileNumber(paths[i]);
});
}
pool.wait();
for (const auto inc : last_increment)
{
if (inc > file_names_increment.value)
file_names_increment.value.store(inc);
}
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Auto-increment is {}", file_names_increment.value);
}
void StorageDistributed::shutdown()
{
monitors_blocker.cancelForever();
std::lock_guard lock(cluster_nodes_mutex);
Fix DROP TABLE for Distributed (racy with INSERT) <details> ``` drop() on T1275: 0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966 1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...) at ../contrib/libcxx/include/new:340 2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...) at ../contrib/libcxx/include/memory:3826 3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 StorageDistributedDirectoryMonitor on T166: 0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=..., pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81 1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474 2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...) at ../src/Storages/StorageDistributed.cpp:682 3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634 4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=..., shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539 5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...) at ../contrib/libcxx/include/vector:1546 6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141 7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135 8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157 9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...) at ../contrib/libcxx/include/memory:3826 10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...) at ../contrib/libcxx/include/memory:3826 11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...) at ../contrib/libcxx/include/memory:3826 12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:747 13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=..., check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false) at ../src/Storages/StorageBuffer.cpp:661 14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000) at ../src/Storages/StorageBuffer.cpp:605 shutdown() on T1275: 0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612 1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...) at ../src/Storages/TableLockHolder.h:12 2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 ``` </details>
2020-10-26 20:01:06 +00:00
LOG_DEBUG(log, "Joining background threads for async INSERT");
cluster_nodes_data.clear();
Fix DROP TABLE for Distributed (racy with INSERT) <details> ``` drop() on T1275: 0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966 1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...) at ../contrib/libcxx/include/new:340 2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...) at ../contrib/libcxx/include/memory:3826 3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 StorageDistributedDirectoryMonitor on T166: 0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=..., pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81 1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474 2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...) at ../src/Storages/StorageDistributed.cpp:682 3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634 4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=..., shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539 5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...) at ../contrib/libcxx/include/vector:1546 6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141 7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135 8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157 9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...) at ../contrib/libcxx/include/memory:3826 10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...) at ../contrib/libcxx/include/memory:3826 11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...) at ../contrib/libcxx/include/memory:3826 12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:747 13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=..., check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false) at ../src/Storages/StorageBuffer.cpp:661 14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000) at ../src/Storages/StorageBuffer.cpp:605 shutdown() on T1275: 0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612 1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...) at ../src/Storages/TableLockHolder.h:12 2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 ``` </details>
2020-10-26 20:01:06 +00:00
LOG_DEBUG(log, "Background threads for async INSERT joined");
}
void StorageDistributed::drop()
{
Fix DROP TABLE for Distributed (racy with INSERT) <details> ``` drop() on T1275: 0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966 1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...) at ../contrib/libcxx/include/new:340 2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...) at ../contrib/libcxx/include/memory:3826 3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 StorageDistributedDirectoryMonitor on T166: 0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=..., pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81 1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474 2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...) at ../src/Storages/StorageDistributed.cpp:682 3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634 4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=..., shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539 5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...) at ../contrib/libcxx/include/vector:1546 6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141 7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135 8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157 9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...) at ../contrib/libcxx/include/memory:3826 10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...) at ../contrib/libcxx/include/memory:3826 11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...) at ../contrib/libcxx/include/memory:3826 12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:747 13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=..., check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false) at ../src/Storages/StorageBuffer.cpp:661 14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000) at ../src/Storages/StorageBuffer.cpp:605 shutdown() on T1275: 0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612 1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...) at ../src/Storages/TableLockHolder.h:12 2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 ``` </details>
2020-10-26 20:01:06 +00:00
// Some INSERT in-between shutdown() and drop() can call
// getDirectoryQueue() again, so call shutdown() to clear them, but
Fix DROP TABLE for Distributed (racy with INSERT) <details> ``` drop() on T1275: 0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966 1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...) at ../contrib/libcxx/include/new:340 2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...) at ../contrib/libcxx/include/memory:3826 3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 StorageDistributedDirectoryMonitor on T166: 0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=..., pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81 1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474 2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...) at ../src/Storages/StorageDistributed.cpp:682 3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634 4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=..., shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539 5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...) at ../contrib/libcxx/include/vector:1546 6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141 7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135 8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157 9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...) at ../contrib/libcxx/include/memory:3826 10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...) at ../contrib/libcxx/include/memory:3826 11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...) at ../contrib/libcxx/include/memory:3826 12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:747 13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=..., check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false) at ../src/Storages/StorageBuffer.cpp:661 14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000) at ../src/Storages/StorageBuffer.cpp:605 shutdown() on T1275: 0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612 1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...) at ../src/Storages/TableLockHolder.h:12 2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50 3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>, stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420 4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>, may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487 5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254 6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326 7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114 9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401 11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 ``` </details>
2020-10-26 20:01:06 +00:00
// when the drop() (this function) executed none of INSERT is allowed in
// parallel.
//
// And second time shutdown() should be fast, since none of
// DirectoryMonitor should do anything, because ActionBlocker is canceled
// (in shutdown()).
shutdown();
2022-04-17 23:02:49 +00:00
// Distributed table without sharding_key does not allows INSERTs
if (relative_data_path.empty())
return;
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
auto disks = data_volume->getDisks();
for (const auto & disk : disks)
{
if (!disk->exists(relative_data_path))
{
LOG_INFO(log, "Path {} is already removed from disk {}", relative_data_path, disk->getName());
continue;
}
disk->removeRecursive(relative_data_path);
}
LOG_DEBUG(log, "Removed");
}
Strings StorageDistributed::getDataPaths() const
{
Strings paths;
if (relative_data_path.empty())
return paths;
for (const DiskPtr & disk : data_volume->getDisks())
paths.push_back(disk->getPath() + relative_data_path);
return paths;
}
void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
2018-04-21 00:35:20 +00:00
{
2018-06-09 15:48:22 +00:00
std::lock_guard lock(cluster_nodes_mutex);
2018-04-21 00:35:20 +00:00
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on TRUNCATE TABLE");
2018-06-09 15:48:22 +00:00
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
2018-04-21 00:35:20 +00:00
{
it->second.directory_monitor->shutdownAndDropAllData();
2018-06-09 15:48:22 +00:00
it = cluster_nodes_data.erase(it);
2018-04-21 00:35:20 +00:00
}
LOG_DEBUG(log, "Removed");
2018-04-21 00:35:20 +00:00
}
StoragePolicyPtr StorageDistributed::getStoragePolicy() const
{
2020-07-23 14:10:48 +00:00
return storage_policy;
}
void StorageDistributed::initializeDirectoryQueuesForDisk(const DiskPtr & disk)
{
const std::string path(disk->getPath() + relative_data_path);
2021-04-27 00:05:43 +00:00
fs::create_directories(path);
std::filesystem::directory_iterator begin(path);
std::filesystem::directory_iterator end;
for (auto it = begin; it != end; ++it)
{
const auto & dir_path = it->path();
if (std::filesystem::is_directory(dir_path))
{
/// Created by DistributedSink
const auto & tmp_path = dir_path / "tmp";
if (std::filesystem::is_directory(tmp_path) && std::filesystem::is_empty(tmp_path))
std::filesystem::remove(tmp_path);
const auto & broken_path = dir_path / "broken";
if (std::filesystem::is_directory(broken_path) && std::filesystem::is_empty(broken_path))
std::filesystem::remove(broken_path);
if (std::filesystem::is_empty(dir_path))
{
2020-11-22 17:13:40 +00:00
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string());
2022-05-09 19:13:02 +00:00
/// Will be created by DistributedSink on demand.
std::filesystem::remove(dir_path);
}
else
{
getDirectoryQueue(disk, dir_path.filename().string());
}
}
}
}
DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(const DiskPtr & disk, const std::string & name)
{
const std::string & disk_path = disk->getPath();
const std::string key(disk_path + name);
Fix race in Distributed table startup Before this patch it was possible to have multiple directory monitors for the same directory, one from the INSERT context, another one on storage startup(). Here are an example of logs for this scenario: 2022.12.07 12:12:27.552485 [ 39925 ] {a47fcb32-4f44-4dbd-94fe-0070d4ea0f6b} <Debug> DDLWorker: Executed query: DETACH TABLE inc.dist_urls_in ... 2022.12.07 12:12:33.228449 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> executeQuery: (from 0.0.0.0:0, user: ) /* ddl_entry=query-0000089229 */ ATTACH TABLE inc.dist_urls_in (stage: Complete) ... this is the DirectoryMonitor created from the context of INSERT for the old StoragePtr that had not been destroyed yet (becase of "was 1" this can be done only from the context of INSERT) ... 2022.12.07 12:12:35.556048 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 1) 2022.12.07 12:12:35.556078 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 71004) 2022.12.07 12:12:35.562716 [ 39536 ] {} <Trace> Connection (i13.ch:9000): Connected to ClickHouse server version 22.10.1. 2022.12.07 12:12:35.562750 [ 39536 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes). ... this is the DirectoryMonitor that created during ATTACH ... 2022.12.07 12:12:35.802080 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 0) 2022.12.07 12:12:35.802107 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 0) 2022.12.07 12:12:35.834216 [ 39265 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes). ... 2022.12.07 12:12:38.532627 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Sent a batch of 10 files (took 2976 ms). ... 2022.12.07 12:12:38.601051 [ 39265 ] {} <Error> inc.dist_urls_in.DirectoryMonitor: std::exception. Code: 1001, type: std::__1::__fs::filesystem::filesystem_error, e.what() = filesystem error: in file_size: No such file or directory ["/data6/clickhouse/data/inc/dist_urls_in/shard13_replica1/66827403.bin"], Stack trace (when copying this message, always include the lines below): ... 2022.12.07 12:12:54.132837 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> DDLWorker: Executed query: ATTACH TABLE inc.dist_urls_in And eventually both monitors (for a short period of time, one replaces another) are trying to process the same batch (current_batch.txt), and one of them fails because such file had been already removed. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-01-05 19:46:09 +00:00
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_monitor)
{
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(name, *this);
node_data.directory_monitor = std::make_unique<DistributedAsyncInsertDirectoryQueue>(
*this, disk, relative_data_path + name,
Fix race in Distributed table startup Before this patch it was possible to have multiple directory monitors for the same directory, one from the INSERT context, another one on storage startup(). Here are an example of logs for this scenario: 2022.12.07 12:12:27.552485 [ 39925 ] {a47fcb32-4f44-4dbd-94fe-0070d4ea0f6b} <Debug> DDLWorker: Executed query: DETACH TABLE inc.dist_urls_in ... 2022.12.07 12:12:33.228449 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> executeQuery: (from 0.0.0.0:0, user: ) /* ddl_entry=query-0000089229 */ ATTACH TABLE inc.dist_urls_in (stage: Complete) ... this is the DirectoryMonitor created from the context of INSERT for the old StoragePtr that had not been destroyed yet (becase of "was 1" this can be done only from the context of INSERT) ... 2022.12.07 12:12:35.556048 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 1) 2022.12.07 12:12:35.556078 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 71004) 2022.12.07 12:12:35.562716 [ 39536 ] {} <Trace> Connection (i13.ch:9000): Connected to ClickHouse server version 22.10.1. 2022.12.07 12:12:35.562750 [ 39536 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes). ... this is the DirectoryMonitor that created during ATTACH ... 2022.12.07 12:12:35.802080 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 0) 2022.12.07 12:12:35.802107 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 0) 2022.12.07 12:12:35.834216 [ 39265 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes). ... 2022.12.07 12:12:38.532627 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Sent a batch of 10 files (took 2976 ms). ... 2022.12.07 12:12:38.601051 [ 39265 ] {} <Error> inc.dist_urls_in.DirectoryMonitor: std::exception. Code: 1001, type: std::__1::__fs::filesystem::filesystem_error, e.what() = filesystem error: in file_size: No such file or directory ["/data6/clickhouse/data/inc/dist_urls_in/shard13_replica1/66827403.bin"], Stack trace (when copying this message, always include the lines below): ... 2022.12.07 12:12:54.132837 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> DDLWorker: Executed query: ATTACH TABLE inc.dist_urls_in And eventually both monitors (for a short period of time, one replaces another) are trying to process the same batch (current_batch.txt), and one of them fails because such file had been already removed. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-01-05 19:46:09 +00:00
node_data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool());
}
Fix race in Distributed table startup Before this patch it was possible to have multiple directory monitors for the same directory, one from the INSERT context, another one on storage startup(). Here are an example of logs for this scenario: 2022.12.07 12:12:27.552485 [ 39925 ] {a47fcb32-4f44-4dbd-94fe-0070d4ea0f6b} <Debug> DDLWorker: Executed query: DETACH TABLE inc.dist_urls_in ... 2022.12.07 12:12:33.228449 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> executeQuery: (from 0.0.0.0:0, user: ) /* ddl_entry=query-0000089229 */ ATTACH TABLE inc.dist_urls_in (stage: Complete) ... this is the DirectoryMonitor created from the context of INSERT for the old StoragePtr that had not been destroyed yet (becase of "was 1" this can be done only from the context of INSERT) ... 2022.12.07 12:12:35.556048 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 1) 2022.12.07 12:12:35.556078 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 71004) 2022.12.07 12:12:35.562716 [ 39536 ] {} <Trace> Connection (i13.ch:9000): Connected to ClickHouse server version 22.10.1. 2022.12.07 12:12:35.562750 [ 39536 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes). ... this is the DirectoryMonitor that created during ATTACH ... 2022.12.07 12:12:35.802080 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 0) 2022.12.07 12:12:35.802107 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 0) 2022.12.07 12:12:35.834216 [ 39265 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes). ... 2022.12.07 12:12:38.532627 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Sent a batch of 10 files (took 2976 ms). ... 2022.12.07 12:12:38.601051 [ 39265 ] {} <Error> inc.dist_urls_in.DirectoryMonitor: std::exception. Code: 1001, type: std::__1::__fs::filesystem::filesystem_error, e.what() = filesystem error: in file_size: No such file or directory ["/data6/clickhouse/data/inc/dist_urls_in/shard13_replica1/66827403.bin"], Stack trace (when copying this message, always include the lines below): ... 2022.12.07 12:12:54.132837 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> DDLWorker: Executed query: ATTACH TABLE inc.dist_urls_in And eventually both monitors (for a short period of time, one replaces another) are trying to process the same batch (current_batch.txt), and one of them fails because such file had been already removed. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-01-05 19:46:09 +00:00
return *node_data.directory_monitor;
}
std::vector<DistributedAsyncInsertDirectoryQueue::Status> StorageDistributed::getDirectoryQueueStatuses() const
{
std::vector<DistributedAsyncInsertDirectoryQueue::Status> statuses;
std::lock_guard lock(cluster_nodes_mutex);
2020-06-06 15:57:52 +00:00
statuses.reserve(cluster_nodes_data.size());
2020-06-04 17:23:46 +00:00
for (const auto & node : cluster_nodes_data)
statuses.push_back(node.second.directory_monitor->getStatus());
return statuses;
}
std::optional<UInt64> StorageDistributed::totalBytes(const Settings &) const
{
UInt64 total_bytes = 0;
for (const auto & status : getDirectoryQueueStatuses())
total_bytes += status.bytes_count;
return total_bytes;
}
2015-09-18 13:36:10 +00:00
size_t StorageDistributed::getShardCount() const
{
return getCluster()->getShardCount();
}
ClusterPtr StorageDistributed::getCluster() const
{
return owned_cluster ? owned_cluster : getContext()->getCluster(cluster_name);
}
ClusterPtr StorageDistributed::getOptimizedCluster(
ContextPtr local_context, const StorageSnapshotPtr & storage_snapshot, const ASTPtr & query_ptr) const
{
ClusterPtr cluster = getCluster();
const Settings & settings = local_context->getSettingsRef();
bool sharding_key_is_usable = settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic;
if (has_sharding_key && sharding_key_is_usable)
{
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, storage_snapshot, local_context);
if (optimized)
return optimized;
}
UInt64 force = settings.force_optimize_skip_unused_shards;
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS || (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key))
{
if (!has_sharding_key)
throw Exception(ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS, "No sharding key");
else if (!sharding_key_is_usable)
throw Exception(ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS, "Sharding key is not deterministic");
else
throw Exception(ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS, "Sharding key {} is not used", sharding_key_column_name);
}
return {};
}
IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
{
const auto & slot_to_shard = cluster->getSlotToShard();
// If result.type is DataTypeLowCardinality, do shard according to its dictionaryType
#define CREATE_FOR_TYPE(TYPE) \
if (typeid_cast<const DataType##TYPE *>(result.type.get())) \
return createBlockSelector<TYPE>(*result.column, slot_to_shard); \
else if (auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(result.type.get())) \
if (typeid_cast<const DataType ## TYPE *>(type_low_cardinality->getDictionaryType().get())) \
return createBlockSelector<TYPE>(*result.column->convertToFullColumnIfLowCardinality(), slot_to_shard);
CREATE_FOR_TYPE(UInt8)
CREATE_FOR_TYPE(UInt16)
CREATE_FOR_TYPE(UInt32)
CREATE_FOR_TYPE(UInt64)
CREATE_FOR_TYPE(Int8)
CREATE_FOR_TYPE(Int16)
CREATE_FOR_TYPE(Int32)
CREATE_FOR_TYPE(Int64)
#undef CREATE_FOR_TYPE
throw Exception(ErrorCodes::TYPE_MISMATCH, "Sharding key expression does not evaluate to an integer type");
}
2018-12-19 12:38:13 +00:00
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
ClusterPtr StorageDistributed::skipUnusedShards(
ClusterPtr cluster,
const ASTPtr & query_ptr,
const StorageSnapshotPtr & storage_snapshot,
ContextPtr local_context) const
2018-12-19 12:38:13 +00:00
{
const auto & select = query_ptr->as<ASTSelectQuery &>();
2018-12-19 12:38:13 +00:00
if (!select.prewhere() && !select.where())
{
2018-12-19 12:38:13 +00:00
return nullptr;
}
ASTPtr condition_ast;
if (select.prewhere() && select.where())
{
condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
}
else
{
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
}
2018-12-19 12:38:13 +00:00
replaceConstantExpressions(condition_ast, local_context, storage_snapshot->metadata->getColumns().getAll(), shared_from_this(), storage_snapshot);
2018-12-19 12:38:13 +00:00
size_t limit = local_context->getSettingsRef().optimize_skip_unused_shards_limit;
if (!limit || limit > SSIZE_MAX)
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "optimize_skip_unused_shards_limit out of range (0, {}]", SSIZE_MAX);
}
// To interpret limit==0 as limit is reached
++limit;
const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr, limit);
if (!limit)
2018-12-19 12:38:13 +00:00
{
2021-04-23 17:51:47 +00:00
LOG_DEBUG(log,
"Number of values for sharding key exceeds optimize_skip_unused_shards_limit={}, "
"try to increase it, but note that this may increase query processing time.",
local_context->getSettingsRef().optimize_skip_unused_shards_limit);
2018-12-19 12:38:13 +00:00
return nullptr;
}
// Can't get definite answer if we can skip any shards
if (!blocks)
{
return nullptr;
}
std::set<int> shards;
for (const auto & block : *blocks)
{
if (!block.has(sharding_key_column_name))
throw Exception(ErrorCodes::TOO_MANY_ROWS, "sharding_key_expr should evaluate as a single row");
2018-12-19 12:38:13 +00:00
2020-03-18 03:27:32 +00:00
const ColumnWithTypeAndName & result = block.getByName(sharding_key_column_name);
2018-12-19 12:38:13 +00:00
const auto selector = createSelector(cluster, result);
shards.insert(selector.begin(), selector.end());
}
return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()});
}
ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
{
2019-04-22 15:11:16 +00:00
if (type == ActionLocks::DistributedSend)
return monitors_blocker.cancel();
return {};
}
2021-07-01 13:21:38 +00:00
void StorageDistributed::flush()
{
2021-07-01 16:43:59 +00:00
try
{
flushClusterNodesAllData(getContext());
}
catch (...)
{
tryLogCurrentException(log, "Cannot flush");
}
2021-07-01 13:21:38 +00:00
}
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
{
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE
auto table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
std::vector<std::shared_ptr<DistributedAsyncInsertDirectoryQueue>> directory_monitors;
{
std::lock_guard lock(cluster_nodes_mutex);
directory_monitors.reserve(cluster_nodes_data.size());
for (auto & node : cluster_nodes_data)
directory_monitors.push_back(node.second.directory_monitor);
}
/// TODO: Maybe it should be executed in parallel
for (auto & node : directory_monitors)
node->flushAllData();
}
2020-04-07 14:05:51 +00:00
void StorageDistributed::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
2019-12-19 19:39:49 +00:00
{
2020-09-18 19:25:56 +00:00
assert(relative_data_path != new_path_to_table_data);
if (!relative_data_path.empty())
renameOnDisk(new_path_to_table_data);
2020-04-07 14:05:51 +00:00
renameInMemory(new_table_id);
}
2020-07-23 14:10:48 +00:00
size_t StorageDistributed::getRandomShardIndex(const Cluster::ShardsInfo & shards)
{
UInt32 total_weight = 0;
for (const auto & shard : shards)
total_weight += shard.weight;
assert(total_weight > 0);
size_t res;
{
std::lock_guard lock(rng_mutex);
res = std::uniform_int_distribution<size_t>(0, total_weight - 1)(rng);
}
for (auto i = 0ul, s = shards.size(); i < s; ++i)
{
if (shards[i].weight > res)
return i;
res -= shards[i].weight;
}
UNREACHABLE();
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : data_volume->getDisks())
2019-12-19 19:39:49 +00:00
{
2021-06-24 10:00:33 +00:00
disk->createDirectories(new_path_to_table_data);
2020-09-18 19:08:53 +00:00
disk->moveDirectory(relative_data_path, new_path_to_table_data);
2020-09-18 19:08:53 +00:00
auto new_path = disk->getPath() + new_path_to_table_data;
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Updating path to {}", new_path);
2019-12-19 19:39:49 +00:00
std::lock_guard lock(cluster_nodes_mutex);
for (auto & node : cluster_nodes_data)
node.second.directory_monitor->updatePath(new_path_to_table_data);
2019-12-19 19:39:49 +00:00
}
relative_data_path = new_path_to_table_data;
2019-12-19 19:39:49 +00:00
}
void StorageDistributed::delayInsertOrThrowIfNeeded() const
{
if (!distributed_settings.bytes_to_throw_insert &&
!distributed_settings.bytes_to_delay_insert)
return;
UInt64 total_bytes = *totalBytes(getContext()->getSettingsRef());
if (distributed_settings.bytes_to_throw_insert && total_bytes > distributed_settings.bytes_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
"Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})",
formatReadableSizeWithBinarySuffix(total_bytes),
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert));
}
if (distributed_settings.bytes_to_delay_insert && total_bytes > distributed_settings.bytes_to_delay_insert)
{
/// Step is 5% of the delay and minimal one second.
/// NOTE: max_delay_to_insert is in seconds, and step is in ms.
2022-09-11 01:21:34 +00:00
const size_t step_ms = static_cast<size_t>(std::min<double>(1., static_cast<double>(distributed_settings.max_delay_to_insert) * 1'000 * 0.05));
UInt64 delayed_ms = 0;
do {
delayed_ms += step_ms;
std::this_thread::sleep_for(std::chrono::milliseconds(step_ms));
} while (*totalBytes(getContext()->getSettingsRef()) > distributed_settings.bytes_to_delay_insert && delayed_ms < distributed_settings.max_delay_to_insert*1000);
ProfileEvents::increment(ProfileEvents::DistributedDelayedInserts);
ProfileEvents::increment(ProfileEvents::DistributedDelayedInsertsMilliseconds, delayed_ms);
UInt64 new_total_bytes = *totalBytes(getContext()->getSettingsRef());
LOG_INFO(log, "Too many bytes pending for async INSERT: was {}, now {}, INSERT was delayed to {} ms",
formatReadableSizeWithBinarySuffix(total_bytes),
formatReadableSizeWithBinarySuffix(new_total_bytes),
delayed_ms);
if (new_total_bytes > distributed_settings.bytes_to_delay_insert)
{
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
"Too many bytes pending for async INSERT: {} (bytes_to_delay_insert={})",
formatReadableSizeWithBinarySuffix(new_total_bytes),
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_delay_insert));
}
}
}
void registerStorageDistributed(StorageFactory & factory)
{
factory.registerStorage("Distributed", [](const StorageFactory::Arguments & args)
{
/** Arguments of engine is following:
* - name of cluster in configuration;
* - name of remote database;
* - name of remote table;
* - policy to store data in;
*
* Remote database may be specified in following form:
* - identifier;
* - constant expression with string result, like currentDatabase();
* -- string literal as specific case;
* - empty string means 'use default database from cluster'.
*
* Distributed engine also supports SETTINGS clause.
*/
ASTs & engine_args = args.engine_args;
if (engine_args.size() < 3 || engine_args.size() > 5)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Distributed requires from 3 "
"to 5 parameters - name of configuration section with list "
"of remote servers, name of remote database, name "
"of remote table, sharding key expression (optional), policy to store data in (optional).");
2020-02-21 13:44:44 +00:00
String cluster_name = getClusterNameAndMakeLiteral(engine_args[0]);
const ContextPtr & context = args.getContext();
const ContextPtr & local_context = args.getLocalContext();
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], local_context);
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], local_context);
String remote_database = checkAndGetLiteralArgument<String>(engine_args[1], "remote_database");
String remote_table = checkAndGetLiteralArgument<String>(engine_args[2], "remote_table");
const auto & sharding_key = engine_args.size() >= 4 ? engine_args[3] : nullptr;
String storage_policy = "default";
if (engine_args.size() >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], local_context);
storage_policy = checkAndGetLiteralArgument<String>(engine_args[4], "storage_policy");
}
/// Check that sharding_key exists in the table and has numeric type.
if (sharding_key)
{
auto sharding_expr = buildShardingKeyExpression(sharding_key, context, args.columns.getAllPhysical(), true);
const Block & block = sharding_expr->getSampleBlock();
if (block.columns() != 1)
throw Exception(ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS, "Sharding expression must return exactly one column");
auto type = block.getByPosition(0).type;
if (!type->isValueRepresentedByInteger())
throw Exception(ErrorCodes::TYPE_MISMATCH, "Sharding expression has type {}, but should be one of integer type",
type->getName());
}
/// TODO: move some arguments from the arguments to the SETTINGS.
DistributedSettings distributed_settings;
if (args.storage_def->settings)
{
distributed_settings.loadFromQuery(*args.storage_def);
}
if (distributed_settings.max_delay_to_insert < 1)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"max_delay_to_insert cannot be less then 1");
if (distributed_settings.bytes_to_throw_insert && distributed_settings.bytes_to_delay_insert &&
distributed_settings.bytes_to_throw_insert <= distributed_settings.bytes_to_delay_insert)
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)");
}
/// Set default values from the distributed_directory_monitor_* global context settings.
if (!distributed_settings.monitor_batch_inserts.changed)
distributed_settings.monitor_batch_inserts = context->getSettingsRef().distributed_directory_monitor_batch_inserts;
if (!distributed_settings.monitor_split_batch_on_failure.changed)
distributed_settings.monitor_split_batch_on_failure = context->getSettingsRef().distributed_directory_monitor_split_batch_on_failure;
if (!distributed_settings.monitor_sleep_time_ms.changed)
distributed_settings.monitor_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_sleep_time_ms);
if (!distributed_settings.monitor_max_sleep_time_ms.changed)
distributed_settings.monitor_max_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms);
return std::make_shared<StorageDistributed>(
2021-04-23 12:18:23 +00:00
args.table_id,
args.columns,
args.constraints,
args.comment,
remote_database,
remote_table,
cluster_name,
context,
sharding_key,
storage_policy,
args.relative_data_path,
distributed_settings,
args.attach);
},
{
.supports_settings = true,
.supports_parallel_insert = true,
.supports_schema_inference = true,
.source_access_type = AccessType::REMOTE,
});
}
2012-05-21 20:38:34 +00:00
}