2018-12-19 12:38:13 +00:00
|
|
|
#include <Storages/StorageDistributed.h>
|
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Databases/IDatabase.h>
|
2021-04-04 10:27:45 +00:00
|
|
|
|
2021-01-09 12:26:37 +00:00
|
|
|
#include <Disks/IDisk.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
|
2021-07-14 13:17:30 +00:00
|
|
|
#include <DataStreams/RemoteQueryExecutor.h>
|
2021-04-04 10:27:45 +00:00
|
|
|
|
2018-07-05 20:38:05 +00:00
|
|
|
#include <DataTypes/DataTypeFactory.h>
|
2020-11-20 17:23:53 +00:00
|
|
|
#include <DataTypes/DataTypeUUID.h>
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
2018-07-05 20:38:05 +00:00
|
|
|
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Storages/Distributed/DistributedBlockOutputStream.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
#include <Storages/StorageFactory.h>
|
2018-12-25 23:14:39 +00:00
|
|
|
#include <Storages/AlterCommands.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
|
2020-04-10 09:24:16 +00:00
|
|
|
#include <Columns/ColumnConst.h>
|
|
|
|
|
2018-01-22 15:56:30 +00:00
|
|
|
#include <Common/Macros.h>
|
2021-01-27 18:43:41 +00:00
|
|
|
#include <Common/ProfileEvents.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/escapeForFileName.h>
|
2017-07-13 20:58:19 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2020-04-16 21:54:43 +00:00
|
|
|
#include <Common/quoteString.h>
|
2020-12-23 16:04:05 +00:00
|
|
|
#include <Common/randomSeed.h>
|
2021-01-26 18:45:37 +00:00
|
|
|
#include <Common/formatReadable.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Parsers/ASTDropQuery.h>
|
|
|
|
#include <Parsers/ASTExpressionList.h>
|
|
|
|
#include <Parsers/ASTIdentifier.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Parsers/ASTInsertQuery.h>
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Parsers/ASTLiteral.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Parsers/ASTTablesInSelectQuery.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Parsers/ParserAlterQuery.h>
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Parsers/TablePropertiesQueriesASTs.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Parsers/parseQuery.h>
|
2021-04-04 10:27:45 +00:00
|
|
|
#include <Parsers/queryToString.h>
|
2021-03-04 17:38:12 +00:00
|
|
|
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
|
|
|
#include <Interpreters/ClusterProxy/executeQuery.h>
|
2020-06-13 16:31:28 +00:00
|
|
|
#include <Interpreters/Cluster.h>
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Interpreters/ExpressionAnalyzer.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/InterpreterAlterQuery.h>
|
|
|
|
#include <Interpreters/InterpreterDescribeQuery.h>
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Interpreters/InterpreterSelectQuery.h>
|
2021-04-04 10:27:45 +00:00
|
|
|
#include <Interpreters/JoinedTables.h>
|
2019-07-26 17:43:42 +00:00
|
|
|
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
|
2020-07-22 17:13:05 +00:00
|
|
|
#include <Interpreters/TreeRewriter.h>
|
2020-05-20 20:16:32 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2018-12-19 12:38:13 +00:00
|
|
|
#include <Interpreters/createBlockSelector.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
#include <Interpreters/evaluateConstantExpression.h>
|
|
|
|
#include <Interpreters/getClusterName.h>
|
2020-04-01 14:21:37 +00:00
|
|
|
#include <Interpreters/getTableExpressions.h>
|
2020-06-16 18:49:04 +00:00
|
|
|
#include <Functions/IFunction.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
|
2021-04-04 10:27:45 +00:00
|
|
|
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
|
|
|
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
2021-03-09 19:00:38 +00:00
|
|
|
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
|
|
|
#include <Processors/Sources/NullSource.h>
|
2021-07-14 13:17:30 +00:00
|
|
|
#include <Processors/Sources/RemoteSource.h>
|
2021-04-04 10:27:45 +00:00
|
|
|
#include <Processors/Sources/SourceFromInputStream.h>
|
|
|
|
#include <Processors/NullSink.h>
|
2021-03-09 19:00:38 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Core/Field.h>
|
2020-06-13 16:31:28 +00:00
|
|
|
#include <Core/Settings.h>
|
2012-05-21 20:38:34 +00:00
|
|
|
|
2018-06-05 19:46:49 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
2020-11-09 19:07:38 +00:00
|
|
|
#include <IO/WriteBufferFromString.h>
|
|
|
|
#include <IO/Operators.h>
|
2021-04-04 10:27:45 +00:00
|
|
|
#include <IO/ConnectionTimeoutsContext.h>
|
2018-06-05 19:46:49 +00:00
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
#include <Poco/DirectoryIterator.h>
|
|
|
|
|
2015-02-10 21:10:58 +00:00
|
|
|
#include <memory>
|
2019-07-31 22:37:41 +00:00
|
|
|
#include <filesystem>
|
2020-04-22 21:44:22 +00:00
|
|
|
#include <optional>
|
2020-09-18 19:25:56 +00:00
|
|
|
#include <cassert>
|
2017-05-10 06:39:37 +00:00
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
namespace fs = std::filesystem;
|
|
|
|
|
2020-01-23 17:48:05 +00:00
|
|
|
namespace
|
|
|
|
{
|
2020-03-09 01:03:43 +00:00
|
|
|
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY = 1;
|
|
|
|
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2;
|
2020-08-15 13:25:30 +00:00
|
|
|
|
|
|
|
const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2;
|
2021-04-04 10:27:45 +00:00
|
|
|
|
|
|
|
const UInt64 PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL = 2;
|
2020-01-23 17:48:05 +00:00
|
|
|
}
|
|
|
|
|
2021-01-27 18:43:41 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
|
|
|
extern const Event DistributedRejectedInserts;
|
|
|
|
extern const Event DistributedDelayedInserts;
|
|
|
|
extern const Event DistributedDelayedInsertsMilliseconds;
|
|
|
|
}
|
|
|
|
|
2012-05-21 20:38:34 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2021-03-09 19:00:38 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2020-02-25 18:02:41 +00:00
|
|
|
extern const int NOT_IMPLEMENTED;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int STORAGE_REQUIRES_PARAMETER;
|
2017-11-03 19:53:10 +00:00
|
|
|
extern const int BAD_ARGUMENTS;
|
2017-12-30 00:36:06 +00:00
|
|
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
|
|
|
extern const int INCORRECT_NUMBER_OF_COLUMNS;
|
2018-03-16 02:08:31 +00:00
|
|
|
extern const int INFINITE_LOOP;
|
2018-06-05 19:46:49 +00:00
|
|
|
extern const int TYPE_MISMATCH;
|
2018-12-19 12:38:13 +00:00
|
|
|
extern const int TOO_MANY_ROWS;
|
2020-01-23 17:48:05 +00:00
|
|
|
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
|
2021-02-02 02:25:19 +00:00
|
|
|
extern const int INVALID_SHARD_ID;
|
2021-02-28 05:24:39 +00:00
|
|
|
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
|
2021-01-26 18:45:37 +00:00
|
|
|
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
|
2021-01-27 18:43:41 +00:00
|
|
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2019-04-08 05:13:16 +00:00
|
|
|
namespace ActionLocks
|
|
|
|
{
|
2019-04-22 15:11:16 +00:00
|
|
|
extern const StorageActionBlockType DistributedSend;
|
2019-04-08 05:13:16 +00:00
|
|
|
}
|
2016-01-11 21:46:36 +00:00
|
|
|
|
2014-08-21 12:07:29 +00:00
|
|
|
namespace
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-07-25 12:31:47 +00:00
|
|
|
/// select query has database, table and table function names as AST pointers
|
|
|
|
/// Creates a copy of query, changes database, table and table function names.
|
2018-07-24 13:10:34 +00:00
|
|
|
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table, ASTPtr table_function_ptr = nullptr)
|
2017-05-10 06:39:37 +00:00
|
|
|
{
|
2018-02-25 00:50:53 +00:00
|
|
|
auto modified_query_ast = query->clone();
|
2019-07-26 17:43:42 +00:00
|
|
|
|
|
|
|
ASTSelectQuery & select_query = modified_query_ast->as<ASTSelectQuery &>();
|
2020-09-16 09:57:26 +00:00
|
|
|
|
|
|
|
// Get rid of the settings clause so we don't send them to remote. Thus newly non-important
|
|
|
|
// settings won't break any remote parser. It's also more reasonable since the query settings
|
|
|
|
// are written into the query context and will be sent by the query pipeline.
|
|
|
|
select_query.setExpression(ASTSelectQuery::Expression::SETTINGS, {});
|
|
|
|
|
2020-04-01 14:21:37 +00:00
|
|
|
if (table_function_ptr)
|
|
|
|
select_query.addTableFunction(table_function_ptr);
|
|
|
|
else
|
|
|
|
select_query.replaceDatabaseAndTable(database, table);
|
2019-07-26 17:43:42 +00:00
|
|
|
|
2020-04-01 14:21:37 +00:00
|
|
|
/// Restore long column names (cause our short names are ambiguous).
|
|
|
|
/// TODO: aliased table functions & CREATE TABLE AS table function cases
|
|
|
|
if (!table_function_ptr)
|
2019-07-26 17:43:42 +00:00
|
|
|
{
|
|
|
|
RestoreQualifiedNamesVisitor::Data data;
|
2020-04-01 14:21:37 +00:00
|
|
|
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as<ASTSelectQuery &>(), 0));
|
|
|
|
data.remote_table.database = database;
|
|
|
|
data.remote_table.table = table;
|
|
|
|
data.rename = true;
|
|
|
|
RestoreQualifiedNamesVisitor(data).visit(modified_query_ast);
|
2019-07-26 17:43:42 +00:00
|
|
|
}
|
|
|
|
|
2017-05-10 06:39:37 +00:00
|
|
|
return modified_query_ast;
|
|
|
|
}
|
|
|
|
|
2018-11-23 17:39:16 +00:00
|
|
|
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
|
|
|
|
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
|
|
|
|
/// the sample block instead.
|
2021-04-20 06:48:42 +00:00
|
|
|
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block)
|
2017-05-10 06:39:37 +00:00
|
|
|
{
|
2018-11-23 17:39:16 +00:00
|
|
|
auto query = std::make_shared<ASTInsertQuery>();
|
2020-03-02 20:23:58 +00:00
|
|
|
query->table_id = StorageID(database, table);
|
2017-05-10 06:39:37 +00:00
|
|
|
|
2019-05-31 18:31:09 +00:00
|
|
|
auto columns = std::make_shared<ASTExpressionList>();
|
|
|
|
query->columns = columns;
|
|
|
|
query->children.push_back(columns);
|
2021-04-20 06:48:42 +00:00
|
|
|
for (const auto & col : sample_block)
|
2019-05-31 18:31:09 +00:00
|
|
|
columns->children.push_back(std::make_shared<ASTIdentifier>(col.name));
|
2017-05-10 06:39:37 +00:00
|
|
|
|
2018-11-23 17:39:16 +00:00
|
|
|
return query;
|
2017-05-10 06:39:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Calculate maximum number in file names in directory and all subdirectories.
|
|
|
|
/// To ensure global order of data blocks yet to be sent across server restarts.
|
2019-01-04 12:10:00 +00:00
|
|
|
UInt64 getMaximumFileNumber(const std::string & dir_path)
|
2017-05-10 06:39:37 +00:00
|
|
|
{
|
|
|
|
UInt64 res = 0;
|
|
|
|
|
2019-07-31 22:37:41 +00:00
|
|
|
std::filesystem::recursive_directory_iterator begin(dir_path);
|
|
|
|
std::filesystem::recursive_directory_iterator end;
|
2017-05-10 06:39:37 +00:00
|
|
|
for (auto it = begin; it != end; ++it)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-01-04 12:10:00 +00:00
|
|
|
const auto & file_path = it->path();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-07-31 22:37:41 +00:00
|
|
|
if (!std::filesystem::is_regular_file(*it) || !endsWith(file_path.filename().string(), ".bin"))
|
2017-05-10 06:39:37 +00:00
|
|
|
continue;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-05-10 06:39:37 +00:00
|
|
|
UInt64 num = 0;
|
|
|
|
try
|
|
|
|
{
|
2019-01-04 12:10:00 +00:00
|
|
|
num = parse<UInt64>(file_path.filename().stem().string());
|
2017-05-10 06:39:37 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
2019-01-04 12:10:00 +00:00
|
|
|
e.addMessage("Unexpected file name " + file_path.filename().string() + " found at " + file_path.parent_path().string() + ", should have numeric base name.");
|
2017-05-10 06:39:37 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (num > res)
|
|
|
|
res = num;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2017-05-10 06:39:37 +00:00
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2019-08-19 20:28:24 +00:00
|
|
|
std::string makeFormattedListOfShards(const ClusterPtr & cluster)
|
|
|
|
{
|
2020-11-09 19:07:38 +00:00
|
|
|
WriteBufferFromOwnString buf;
|
2019-08-19 20:28:24 +00:00
|
|
|
|
|
|
|
bool head = true;
|
2020-11-09 19:07:38 +00:00
|
|
|
buf << "[";
|
2019-08-19 20:28:24 +00:00
|
|
|
for (const auto & shard_info : cluster->getShardsInfo())
|
|
|
|
{
|
2020-11-09 19:07:38 +00:00
|
|
|
(head ? buf : buf << ", ") << shard_info.shard_num;
|
2019-08-19 20:28:24 +00:00
|
|
|
head = false;
|
|
|
|
}
|
2020-11-09 19:07:38 +00:00
|
|
|
buf << "]";
|
2019-08-19 20:28:24 +00:00
|
|
|
|
2020-11-09 19:07:38 +00:00
|
|
|
return buf.str();
|
2019-08-19 20:28:24 +00:00
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
ExpressionActionsPtr buildShardingKeyExpression(const ASTPtr & sharding_key, ContextPtr context, const NamesAndTypesList & columns, bool project)
|
2020-03-22 10:37:35 +00:00
|
|
|
{
|
|
|
|
ASTPtr query = sharding_key;
|
2020-07-22 17:13:05 +00:00
|
|
|
auto syntax_result = TreeRewriter(context).analyze(query, columns);
|
2020-03-22 10:37:35 +00:00
|
|
|
return ExpressionAnalyzer(query, syntax_result, context).getActions(project);
|
2014-08-13 12:52:30 +00:00
|
|
|
}
|
|
|
|
|
2021-06-28 17:02:22 +00:00
|
|
|
bool isExpressionActionsDeterministic(const ExpressionActionsPtr & actions)
|
2020-06-16 18:49:04 +00:00
|
|
|
{
|
|
|
|
for (const auto & action : actions->getActions())
|
|
|
|
{
|
2020-11-10 14:54:59 +00:00
|
|
|
if (action.node->type != ActionsDAG::ActionType::FUNCTION)
|
2020-06-16 18:49:04 +00:00
|
|
|
continue;
|
2020-11-03 11:28:28 +00:00
|
|
|
if (!action.node->function_base->isDeterministic())
|
2020-06-16 18:49:04 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2020-03-22 10:37:35 +00:00
|
|
|
class ReplacingConstantExpressionsMatcher
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
using Data = Block;
|
2014-08-13 12:52:30 +00:00
|
|
|
|
2020-03-22 10:37:35 +00:00
|
|
|
static bool needChildVisit(ASTPtr &, const ASTPtr &)
|
|
|
|
{
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void visit(ASTPtr & node, Block & block_with_constants)
|
|
|
|
{
|
|
|
|
if (!node->as<ASTFunction>())
|
|
|
|
return;
|
2017-06-06 18:48:38 +00:00
|
|
|
|
2020-03-22 10:37:35 +00:00
|
|
|
std::string name = node->getColumnName();
|
|
|
|
if (block_with_constants.has(name))
|
|
|
|
{
|
|
|
|
auto result = block_with_constants.getByName(name);
|
|
|
|
if (!isColumnConst(*result.column))
|
|
|
|
return;
|
|
|
|
|
2020-03-23 17:28:38 +00:00
|
|
|
node = std::make_shared<ASTLiteral>(assert_cast<const ColumnConst &>(*result.column).getField());
|
2020-03-22 10:37:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2020-03-23 17:28:38 +00:00
|
|
|
|
2020-06-18 09:08:24 +00:00
|
|
|
void replaceConstantExpressions(
|
|
|
|
ASTPtr & node,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context,
|
2020-06-18 09:08:24 +00:00
|
|
|
const NamesAndTypesList & columns,
|
|
|
|
ConstStoragePtr storage,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot)
|
2018-11-08 15:43:14 +00:00
|
|
|
{
|
2020-07-22 17:13:05 +00:00
|
|
|
auto syntax_result = TreeRewriter(context).analyze(node, columns, storage, metadata_snapshot);
|
2020-03-22 10:37:35 +00:00
|
|
|
Block block_with_constants = KeyCondition::getBlockWithConstants(node, syntax_result, context);
|
|
|
|
|
|
|
|
InDepthNodeVisitor<ReplacingConstantExpressionsMatcher, true> visitor(block_with_constants);
|
|
|
|
visitor.visit(node);
|
2018-11-08 15:43:14 +00:00
|
|
|
}
|
2017-06-06 18:48:38 +00:00
|
|
|
|
2021-05-19 13:14:33 +00:00
|
|
|
/// This is the implementation of optimize_distributed_group_by_sharding_key.
|
|
|
|
/// It returns up to which stage the query can be processed on a shard, which
|
|
|
|
/// is one of the following:
|
2020-04-22 21:44:22 +00:00
|
|
|
/// - QueryProcessingStage::Complete
|
|
|
|
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
|
2021-06-04 06:43:56 +00:00
|
|
|
/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit
|
2020-04-22 21:44:22 +00:00
|
|
|
/// - none (in this case regular WithMergeableState should be used)
|
Fix optimize_distributed_group_by_sharding_key for multiple columns
Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1
Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.
So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
2021-07-15 06:09:58 +00:00
|
|
|
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns)
|
2020-04-01 18:38:01 +00:00
|
|
|
{
|
2021-05-05 21:26:14 +00:00
|
|
|
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
2020-04-22 21:44:22 +00:00
|
|
|
|
Fix optimize_distributed_group_by_sharding_key for multiple columns
Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1
Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.
So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
2021-07-15 06:09:58 +00:00
|
|
|
auto sharding_block_has = [&](const auto & exprs) -> bool
|
2020-04-22 21:44:22 +00:00
|
|
|
{
|
Fix optimize_distributed_group_by_sharding_key for multiple columns
Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1
Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.
So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
2021-07-15 06:09:58 +00:00
|
|
|
std::unordered_set<std::string> expr_columns;
|
2020-04-22 21:44:22 +00:00
|
|
|
for (auto & expr : exprs)
|
|
|
|
{
|
|
|
|
auto id = expr->template as<ASTIdentifier>();
|
|
|
|
if (!id)
|
Fix optimize_distributed_group_by_sharding_key for multiple columns
Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1
Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.
So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
2021-07-15 06:09:58 +00:00
|
|
|
continue;
|
|
|
|
expr_columns.emplace(id->name());
|
|
|
|
}
|
|
|
|
|
|
|
|
for (const auto & column : sharding_key_columns)
|
|
|
|
{
|
|
|
|
if (!expr_columns.contains(column))
|
2020-04-22 21:44:22 +00:00
|
|
|
return false;
|
|
|
|
}
|
Fix optimize_distributed_group_by_sharding_key for multiple columns
Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1
Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.
So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
2021-07-15 06:09:58 +00:00
|
|
|
|
2020-04-22 21:44:22 +00:00
|
|
|
return true;
|
|
|
|
};
|
|
|
|
|
|
|
|
// GROUP BY qualifiers
|
|
|
|
// - TODO: WITH TOTALS can be implemented
|
|
|
|
// - TODO: WITH ROLLUP can be implemented (I guess)
|
|
|
|
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
|
|
|
|
return {};
|
|
|
|
|
2021-05-05 21:26:14 +00:00
|
|
|
// Window functions are not supported.
|
|
|
|
if (query_info.has_window)
|
|
|
|
return {};
|
|
|
|
|
2020-04-22 21:44:22 +00:00
|
|
|
// TODO: extremes support can be implemented
|
|
|
|
if (extremes)
|
|
|
|
return {};
|
|
|
|
|
|
|
|
// DISTINCT
|
|
|
|
if (select.distinct)
|
|
|
|
{
|
|
|
|
if (!sharding_block_has(select.select()->children))
|
|
|
|
return {};
|
|
|
|
}
|
2020-04-01 18:38:01 +00:00
|
|
|
|
2020-04-22 21:44:22 +00:00
|
|
|
// GROUP BY
|
|
|
|
const ASTPtr group_by = select.groupBy();
|
|
|
|
if (!group_by)
|
|
|
|
{
|
|
|
|
if (!select.distinct)
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
Fix optimize_distributed_group_by_sharding_key for multiple columns
Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1
Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.
So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
2021-07-15 06:09:58 +00:00
|
|
|
if (!sharding_block_has(group_by->children))
|
2020-04-22 21:44:22 +00:00
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
|
|
|
// ORDER BY
|
|
|
|
const ASTPtr order_by = select.orderBy();
|
|
|
|
if (order_by)
|
2021-06-04 06:43:56 +00:00
|
|
|
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
|
2020-04-22 21:44:22 +00:00
|
|
|
|
|
|
|
// LIMIT BY
|
|
|
|
// LIMIT
|
2020-12-02 17:11:39 +00:00
|
|
|
// OFFSET
|
|
|
|
if (select.limitBy() || select.limitLength() || select.limitOffset())
|
2021-06-04 06:43:56 +00:00
|
|
|
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
|
2020-04-22 21:44:22 +00:00
|
|
|
|
|
|
|
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
|
|
|
|
return QueryProcessingStage::Complete;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster)
|
|
|
|
{
|
2020-04-01 18:38:01 +00:00
|
|
|
size_t num_local_shards = cluster->getLocalShardCount();
|
|
|
|
size_t num_remote_shards = cluster->getRemoteShardCount();
|
2020-04-22 21:44:22 +00:00
|
|
|
return (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
|
2020-04-01 18:38:01 +00:00
|
|
|
}
|
|
|
|
|
2020-03-26 18:25:26 +00:00
|
|
|
}
|
2020-03-22 10:37:35 +00:00
|
|
|
|
|
|
|
|
|
|
|
/// For destruction of std::unique_ptr of type that is incomplete in class definition.
|
|
|
|
StorageDistributed::~StorageDistributed() = default;
|
2020-01-20 17:54:52 +00:00
|
|
|
|
2020-04-24 09:20:09 +00:00
|
|
|
|
2020-04-28 10:38:57 +00:00
|
|
|
NamesAndTypesList StorageDistributed::getVirtuals() const
|
2020-04-27 13:55:30 +00:00
|
|
|
{
|
|
|
|
/// NOTE This is weird. Most of these virtual columns are part of MergeTree
|
|
|
|
/// tables info. But Distributed is general-purpose engine.
|
2020-04-28 10:38:57 +00:00
|
|
|
return NamesAndTypesList{
|
2020-04-27 13:55:30 +00:00
|
|
|
NameAndTypePair("_table", std::make_shared<DataTypeString>()),
|
|
|
|
NameAndTypePair("_part", std::make_shared<DataTypeString>()),
|
|
|
|
NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>()),
|
2020-11-20 17:23:53 +00:00
|
|
|
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
|
2020-04-27 13:55:30 +00:00
|
|
|
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
|
|
|
|
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
|
|
|
|
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()),
|
|
|
|
};
|
|
|
|
}
|
2020-04-24 09:20:09 +00:00
|
|
|
|
2014-09-30 03:08:47 +00:00
|
|
|
StorageDistributed::StorageDistributed(
|
2019-12-04 16:06:55 +00:00
|
|
|
const StorageID & id_,
|
2018-03-06 20:18:34 +00:00
|
|
|
const ColumnsDescription & columns_,
|
2019-08-24 21:20:20 +00:00
|
|
|
const ConstraintsDescription & constraints_,
|
2021-04-23 12:18:23 +00:00
|
|
|
const String & comment,
|
2017-04-01 07:20:54 +00:00
|
|
|
const String & remote_database_,
|
|
|
|
const String & remote_table_,
|
|
|
|
const String & cluster_name_,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context_,
|
2017-04-01 07:20:54 +00:00
|
|
|
const ASTPtr & sharding_key_,
|
2020-07-23 14:10:48 +00:00
|
|
|
const String & storage_policy_name_,
|
2019-10-25 19:07:47 +00:00
|
|
|
const String & relative_data_path_,
|
2021-01-07 14:14:41 +00:00
|
|
|
const DistributedSettings & distributed_settings_,
|
2020-10-14 12:19:29 +00:00
|
|
|
bool attach_,
|
|
|
|
ClusterPtr owned_cluster_)
|
2020-04-27 13:55:30 +00:00
|
|
|
: IStorage(id_)
|
2021-04-10 23:33:54 +00:00
|
|
|
, WithContext(context_->getGlobalContext())
|
2019-12-04 16:06:55 +00:00
|
|
|
, remote_database(remote_database_)
|
|
|
|
, remote_table(remote_table_)
|
2020-05-30 21:57:37 +00:00
|
|
|
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
|
2020-10-14 12:19:29 +00:00
|
|
|
, owned_cluster(std::move(owned_cluster_))
|
2021-04-10 23:33:54 +00:00
|
|
|
, cluster_name(getContext()->getMacros()->expand(cluster_name_))
|
2019-12-04 16:06:55 +00:00
|
|
|
, has_sharding_key(sharding_key_)
|
2020-01-20 17:54:52 +00:00
|
|
|
, relative_data_path(relative_data_path_)
|
2021-01-07 14:14:41 +00:00
|
|
|
, distributed_settings(distributed_settings_)
|
2020-12-23 16:04:05 +00:00
|
|
|
, rng(randomSeed())
|
2014-09-30 03:08:47 +00:00
|
|
|
{
|
2020-06-19 15:39:41 +00:00
|
|
|
StorageInMemoryMetadata storage_metadata;
|
|
|
|
storage_metadata.setColumns(columns_);
|
|
|
|
storage_metadata.setConstraints(constraints_);
|
2021-04-23 12:18:23 +00:00
|
|
|
storage_metadata.setComment(comment);
|
2020-06-19 15:39:41 +00:00
|
|
|
setInMemoryMetadata(storage_metadata);
|
2019-08-24 21:20:20 +00:00
|
|
|
|
2019-08-26 13:46:07 +00:00
|
|
|
if (sharding_key_)
|
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
sharding_key_expr = buildShardingKeyExpression(sharding_key_, getContext(), storage_metadata.getColumns().getAllPhysical(), false);
|
2019-08-26 13:46:07 +00:00
|
|
|
sharding_key_column_name = sharding_key_->getColumnName();
|
2021-06-28 17:02:22 +00:00
|
|
|
sharding_key_is_deterministic = isExpressionActionsDeterministic(sharding_key_expr);
|
2019-08-26 13:46:07 +00:00
|
|
|
}
|
|
|
|
|
2020-01-20 17:54:52 +00:00
|
|
|
if (!relative_data_path.empty())
|
2020-07-23 14:10:48 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
storage_policy = getContext()->getStoragePolicy(storage_policy_name_);
|
2020-09-15 09:26:56 +00:00
|
|
|
data_volume = storage_policy->getVolume(0);
|
|
|
|
if (storage_policy->getVolumes().size() > 1)
|
|
|
|
LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. "
|
|
|
|
"Only {} volume will be used to store data. Other will be ignored.", data_volume->getName());
|
2020-07-23 14:10:48 +00:00
|
|
|
}
|
2020-01-20 17:54:52 +00:00
|
|
|
|
2018-03-16 02:08:31 +00:00
|
|
|
/// Sanity check. Skip check if the table is already created to allow the server to start.
|
2019-08-03 11:02:40 +00:00
|
|
|
if (!attach_ && !cluster_name.empty())
|
2018-03-16 02:08:31 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
size_t num_local_shards = getContext()->getCluster(cluster_name)->getLocalShardCount();
|
2019-12-04 16:06:55 +00:00
|
|
|
if (num_local_shards && remote_database == id_.database_name && remote_table == id_.table_name)
|
|
|
|
throw Exception("Distributed table " + id_.table_name + " looks at itself", ErrorCodes::INFINITE_LOOP);
|
2018-03-16 02:08:31 +00:00
|
|
|
}
|
2014-09-30 03:08:47 +00:00
|
|
|
}
|
|
|
|
|
2017-03-09 03:34:09 +00:00
|
|
|
|
2018-07-24 13:10:34 +00:00
|
|
|
StorageDistributed::StorageDistributed(
|
2019-12-04 16:06:55 +00:00
|
|
|
const StorageID & id_,
|
2018-03-12 13:47:01 +00:00
|
|
|
const ColumnsDescription & columns_,
|
2019-08-24 21:20:20 +00:00
|
|
|
const ConstraintsDescription & constraints_,
|
2018-07-24 13:10:34 +00:00
|
|
|
ASTPtr remote_table_function_ptr_,
|
|
|
|
const String & cluster_name_,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context_,
|
2018-07-24 13:10:34 +00:00
|
|
|
const ASTPtr & sharding_key_,
|
2020-07-23 14:10:48 +00:00
|
|
|
const String & storage_policy_name_,
|
2019-10-25 19:07:47 +00:00
|
|
|
const String & relative_data_path_,
|
2021-01-07 14:14:41 +00:00
|
|
|
const DistributedSettings & distributed_settings_,
|
2020-10-14 12:19:29 +00:00
|
|
|
bool attach,
|
|
|
|
ClusterPtr owned_cluster_)
|
2021-04-23 12:18:23 +00:00
|
|
|
: StorageDistributed(
|
|
|
|
id_,
|
|
|
|
columns_,
|
|
|
|
constraints_,
|
|
|
|
String{},
|
|
|
|
String{},
|
|
|
|
String{},
|
|
|
|
cluster_name_,
|
|
|
|
context_,
|
|
|
|
sharding_key_,
|
|
|
|
storage_policy_name_,
|
|
|
|
relative_data_path_,
|
|
|
|
distributed_settings_,
|
|
|
|
attach,
|
|
|
|
std::move(owned_cluster_))
|
2018-07-24 13:10:34 +00:00
|
|
|
{
|
2019-12-04 16:06:55 +00:00
|
|
|
remote_table_function_ptr = std::move(remote_table_function_ptr_);
|
2018-07-24 13:10:34 +00:00
|
|
|
}
|
|
|
|
|
2020-11-07 21:30:40 +00:00
|
|
|
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
|
2021-04-22 13:32:17 +00:00
|
|
|
ContextPtr local_context,
|
|
|
|
QueryProcessingStage::Enum to_stage,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
|
|
SelectQueryInfo & query_info) const
|
2020-04-16 21:54:43 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
const auto & settings = local_context->getSettingsRef();
|
2020-04-30 23:47:19 +00:00
|
|
|
|
2020-09-10 19:55:36 +00:00
|
|
|
ClusterPtr cluster = getCluster();
|
|
|
|
query_info.cluster = cluster;
|
|
|
|
|
|
|
|
/// Always calculate optimized cluster here, to avoid conditions during read()
|
|
|
|
/// (Anyway it will be calculated in the read())
|
2021-04-12 19:15:58 +00:00
|
|
|
if (getClusterQueriedNodes(settings, cluster) > 1 && settings.optimize_skip_unused_shards)
|
2020-09-10 19:55:36 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query);
|
2020-09-10 19:55:36 +00:00
|
|
|
if (optimized_cluster)
|
|
|
|
{
|
2021-03-12 23:47:20 +00:00
|
|
|
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
|
|
|
|
makeFormattedListOfShards(optimized_cluster));
|
2020-09-10 19:55:36 +00:00
|
|
|
cluster = optimized_cluster;
|
2021-03-29 19:02:34 +00:00
|
|
|
query_info.optimized_cluster = cluster;
|
2020-09-10 19:55:36 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-03-12 23:47:20 +00:00
|
|
|
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}",
|
|
|
|
has_sharding_key ? "" : " (no sharding key)");
|
2020-09-10 19:55:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-22 21:44:22 +00:00
|
|
|
if (settings.distributed_group_by_no_merge)
|
2020-08-15 13:25:30 +00:00
|
|
|
{
|
|
|
|
if (settings.distributed_group_by_no_merge == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION)
|
2021-06-04 06:43:56 +00:00
|
|
|
{
|
|
|
|
if (settings.distributed_push_down_limit)
|
|
|
|
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
|
|
|
|
else
|
|
|
|
return QueryProcessingStage::WithMergeableStateAfterAggregation;
|
|
|
|
}
|
2020-08-15 13:25:30 +00:00
|
|
|
else
|
2021-06-04 06:43:56 +00:00
|
|
|
{
|
|
|
|
/// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit
|
2021-06-28 17:02:22 +00:00
|
|
|
/// (since in this case queries processed separately and the initiator is just a proxy in this case).
|
2020-08-15 13:25:30 +00:00
|
|
|
return QueryProcessingStage::Complete;
|
2021-06-04 06:43:56 +00:00
|
|
|
}
|
2020-08-15 13:25:30 +00:00
|
|
|
}
|
2020-04-16 21:54:43 +00:00
|
|
|
|
2021-06-04 06:43:56 +00:00
|
|
|
if (settings.distributed_push_down_limit)
|
|
|
|
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
|
|
|
|
|
2020-04-22 21:44:22 +00:00
|
|
|
/// Nested distributed query cannot return Complete stage,
|
|
|
|
/// since the parent query need to aggregate the results after.
|
|
|
|
if (to_stage == QueryProcessingStage::WithMergeableState)
|
|
|
|
return QueryProcessingStage::WithMergeableState;
|
|
|
|
|
|
|
|
/// If there is only one node, the query can be fully processed by the
|
|
|
|
/// shard, initiator will work as a proxy only.
|
|
|
|
if (getClusterQueriedNodes(settings, cluster) == 1)
|
|
|
|
return QueryProcessingStage::Complete;
|
|
|
|
|
|
|
|
if (settings.optimize_skip_unused_shards &&
|
|
|
|
settings.optimize_distributed_group_by_sharding_key &&
|
|
|
|
has_sharding_key &&
|
2020-10-16 21:58:06 +00:00
|
|
|
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic))
|
2020-04-22 21:44:22 +00:00
|
|
|
{
|
Fix optimize_distributed_group_by_sharding_key for multiple columns
Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1
Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.
So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
2021-07-15 06:09:58 +00:00
|
|
|
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns());
|
2020-04-22 21:44:22 +00:00
|
|
|
if (stage)
|
|
|
|
{
|
|
|
|
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
|
|
|
|
return *stage;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return QueryProcessingStage::WithMergeableState;
|
2020-03-18 00:57:00 +00:00
|
|
|
}
|
|
|
|
|
2020-08-03 13:54:14 +00:00
|
|
|
Pipe StorageDistributed::read(
|
2020-09-25 13:19:26 +00:00
|
|
|
const Names & column_names,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2020-11-10 12:02:22 +00:00
|
|
|
SelectQueryInfo & query_info,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr local_context,
|
2020-09-25 13:19:26 +00:00
|
|
|
QueryProcessingStage::Enum processed_stage,
|
|
|
|
const size_t max_block_size,
|
|
|
|
const unsigned num_streams)
|
|
|
|
{
|
|
|
|
QueryPlan plan;
|
2021-04-10 23:33:54 +00:00
|
|
|
read(plan, column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
|
2021-03-04 17:38:12 +00:00
|
|
|
return plan.convertToPipe(
|
2021-04-10 23:33:54 +00:00
|
|
|
QueryPlanOptimizationSettings::fromContext(local_context),
|
|
|
|
BuildQueryPipelineSettings::fromContext(local_context));
|
2020-09-25 13:19:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void StorageDistributed::read(
|
2020-09-18 14:16:53 +00:00
|
|
|
QueryPlan & query_plan,
|
2019-09-18 21:17:00 +00:00
|
|
|
const Names & column_names,
|
2020-06-17 14:37:21 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2020-09-20 17:52:17 +00:00
|
|
|
SelectQueryInfo & query_info,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr local_context,
|
2018-04-19 14:47:09 +00:00
|
|
|
QueryProcessingStage::Enum processed_stage,
|
|
|
|
const size_t /*max_block_size*/,
|
|
|
|
const unsigned /*num_streams*/)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
const auto & modified_query_ast = rewriteSelectQuery(
|
2018-07-24 13:10:34 +00:00
|
|
|
query_info.query, remote_database, remote_table, remote_table_function_ptr);
|
2015-02-10 20:48:17 +00:00
|
|
|
|
2019-08-09 13:37:42 +00:00
|
|
|
Block header =
|
2021-04-10 23:33:54 +00:00
|
|
|
InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
2018-02-15 18:54:12 +00:00
|
|
|
|
2021-03-09 19:00:38 +00:00
|
|
|
/// Return directly (with correct header) if no shard to query.
|
2021-03-29 19:02:34 +00:00
|
|
|
if (query_info.getCluster()->getShardsInfo().empty())
|
2021-03-09 19:00:38 +00:00
|
|
|
{
|
|
|
|
Pipe pipe(std::make_shared<NullSource>(header));
|
|
|
|
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
|
|
|
|
read_from_pipe->setStepDescription("Read from NullSource (Distributed)");
|
|
|
|
query_plan.addStep(std::move(read_from_pipe));
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
2018-02-15 18:54:12 +00:00
|
|
|
|
2019-09-18 21:17:00 +00:00
|
|
|
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
|
2020-06-17 14:37:21 +00:00
|
|
|
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num", metadata_snapshot))
|
2019-09-18 21:17:00 +00:00
|
|
|
has_virtual_shard_num_column = false;
|
|
|
|
|
2021-07-15 16:15:16 +00:00
|
|
|
StorageID main_table = StorageID::createEmpty();
|
|
|
|
if (!remote_table_function_ptr)
|
|
|
|
main_table = StorageID{remote_database, remote_table};
|
|
|
|
|
|
|
|
ClusterProxy::SelectStreamFactory select_stream_factory =
|
|
|
|
ClusterProxy::SelectStreamFactory(
|
2021-04-10 23:33:54 +00:00
|
|
|
header,
|
|
|
|
processed_stage,
|
2021-07-15 16:15:16 +00:00
|
|
|
has_virtual_shard_num_column);
|
2015-11-06 17:44:01 +00:00
|
|
|
|
2021-07-15 16:15:16 +00:00
|
|
|
ClusterProxy::executeQuery(
|
|
|
|
query_plan, header, processed_stage,
|
|
|
|
main_table, remote_table_function_ptr,
|
|
|
|
select_stream_factory, log, modified_query_ast,
|
|
|
|
local_context, query_info,
|
2021-03-07 15:51:01 +00:00
|
|
|
sharding_key_expr, sharding_key_column_name,
|
2021-06-04 13:09:59 +00:00
|
|
|
query_info.cluster);
|
2021-03-09 19:00:38 +00:00
|
|
|
|
|
|
|
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
|
|
|
|
if (!query_plan.isInitialized())
|
|
|
|
throw Exception("Pipeline is not initialized", ErrorCodes::LOGICAL_ERROR);
|
2012-05-21 20:38:34 +00:00
|
|
|
}
|
|
|
|
|
2017-03-09 03:34:09 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
|
2014-08-12 13:46:46 +00:00
|
|
|
{
|
2018-03-16 02:08:31 +00:00
|
|
|
auto cluster = getCluster();
|
2021-04-10 23:33:54 +00:00
|
|
|
const auto & settings = local_context->getSettingsRef();
|
2016-10-10 08:44:52 +00:00
|
|
|
|
2018-01-25 12:18:27 +00:00
|
|
|
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
|
2021-02-02 02:25:19 +00:00
|
|
|
if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync && !settings.insert_shard_id)
|
2018-01-25 12:18:27 +00:00
|
|
|
{
|
2021-01-05 23:54:22 +00:00
|
|
|
throw Exception("Storage " + getName() + " must have own data directory to enable asynchronous inserts",
|
2018-01-25 12:18:27 +00:00
|
|
|
ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
}
|
2016-10-10 08:44:52 +00:00
|
|
|
|
2021-02-02 02:25:19 +00:00
|
|
|
auto shard_num = cluster->getLocalShardCount() + cluster->getRemoteShardCount();
|
|
|
|
|
2018-01-25 12:18:27 +00:00
|
|
|
/// If sharding key is not specified, then you can only write to a shard containing only one shard
|
2021-02-02 02:25:19 +00:00
|
|
|
if (!settings.insert_shard_id && !settings.insert_distributed_one_random_shard && !has_sharding_key && shard_num >= 2)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
"Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
|
|
|
|
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (settings.insert_shard_id && settings.insert_shard_id > shard_num)
|
2018-01-25 12:18:27 +00:00
|
|
|
{
|
2021-02-02 02:25:19 +00:00
|
|
|
throw Exception("Shard id should be range from 1 to shard number", ErrorCodes::INVALID_SHARD_ID);
|
2018-01-25 12:18:27 +00:00
|
|
|
}
|
2014-08-21 12:07:29 +00:00
|
|
|
|
2018-01-25 12:18:27 +00:00
|
|
|
/// Force sync insertion if it is remote() table function
|
2021-02-02 02:25:19 +00:00
|
|
|
bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster;
|
2017-11-02 14:01:11 +00:00
|
|
|
auto timeout = settings.insert_distributed_timeout;
|
|
|
|
|
2021-04-20 06:48:42 +00:00
|
|
|
Block sample_block;
|
|
|
|
if (!settings.insert_allow_materialized_columns)
|
|
|
|
sample_block = metadata_snapshot->getSampleBlockNonMaterialized();
|
|
|
|
else
|
|
|
|
sample_block = metadata_snapshot->getSampleBlock();
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
|
|
|
|
return std::make_shared<DistributedBlockOutputStream>(
|
2021-04-10 23:33:54 +00:00
|
|
|
local_context, *this, metadata_snapshot,
|
2021-04-20 06:48:42 +00:00
|
|
|
createInsertToRemoteTableQuery(remote_database, remote_table, sample_block),
|
2021-04-15 14:31:23 +00:00
|
|
|
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});
|
2014-08-12 13:46:46 +00:00
|
|
|
}
|
|
|
|
|
2017-03-09 03:34:09 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
QueryPipelinePtr StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
|
2021-04-04 10:27:45 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
const Settings & settings = local_context->getSettingsRef();
|
2021-04-04 10:27:45 +00:00
|
|
|
std::shared_ptr<StorageDistributed> storage_src;
|
|
|
|
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
|
|
|
|
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
|
|
|
|
if (select.list_of_selects->children.size() == 1)
|
|
|
|
{
|
|
|
|
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
|
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
|
2021-04-04 10:27:45 +00:00
|
|
|
|
|
|
|
if (joined_tables.tablesCount() == 1)
|
|
|
|
{
|
|
|
|
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
|
|
|
|
if (storage_src)
|
|
|
|
{
|
|
|
|
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
|
|
|
|
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
|
|
|
|
|
|
|
|
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
|
|
|
|
select_with_union_query->list_of_selects->children.push_back(new_select_query);
|
|
|
|
|
|
|
|
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
|
|
|
|
|
|
|
|
new_query->select = select_with_union_query;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!storage_src || storage_src->getClusterName() != getClusterName())
|
|
|
|
{
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
|
|
|
|
{
|
|
|
|
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
|
|
|
|
}
|
|
|
|
|
|
|
|
const auto & cluster = getCluster();
|
|
|
|
const auto & shards_info = cluster->getShardsInfo();
|
|
|
|
|
|
|
|
std::vector<std::unique_ptr<QueryPipeline>> pipelines;
|
|
|
|
|
|
|
|
String new_query_str = queryToString(new_query);
|
2021-06-15 19:55:21 +00:00
|
|
|
for (size_t shard_index : collections::range(0, shards_info.size()))
|
2021-04-04 10:27:45 +00:00
|
|
|
{
|
|
|
|
const auto & shard_info = shards_info[shard_index];
|
|
|
|
if (shard_info.isLocal())
|
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
InterpreterInsertQuery interpreter(new_query, local_context);
|
2021-04-04 10:27:45 +00:00
|
|
|
pipelines.emplace_back(std::make_unique<QueryPipeline>(interpreter.execute().pipeline));
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
|
|
|
|
auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
|
|
|
|
if (connections.empty() || connections.front().isNull())
|
|
|
|
throw Exception(
|
|
|
|
"Expected exactly one connection for shard " + toString(shard_info.shard_num), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
/// INSERT SELECT query returns empty block
|
2021-07-14 13:17:30 +00:00
|
|
|
auto remote_query_executor
|
|
|
|
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context);
|
2021-04-04 10:27:45 +00:00
|
|
|
pipelines.emplace_back(std::make_unique<QueryPipeline>());
|
2021-07-14 13:17:30 +00:00
|
|
|
pipelines.back()->init(Pipe(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote)));
|
2021-04-04 10:27:45 +00:00
|
|
|
pipelines.back()->setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
|
|
|
|
{
|
|
|
|
return std::make_shared<EmptySink>(header);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
return std::make_unique<QueryPipeline>(QueryPipeline::unitePipelines(std::move(pipelines)));
|
2021-04-04 10:27:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
|
2013-09-23 12:01:19 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
auto name_deps = getDependentViewsByColumn(local_context);
|
2019-12-26 18:17:05 +00:00
|
|
|
for (const auto & command : commands)
|
|
|
|
{
|
|
|
|
if (command.type != AlterCommand::Type::ADD_COLUMN
|
|
|
|
&& command.type != AlterCommand::Type::MODIFY_COLUMN
|
|
|
|
&& command.type != AlterCommand::Type::DROP_COLUMN
|
2020-05-07 12:54:35 +00:00
|
|
|
&& command.type != AlterCommand::Type::COMMENT_COLUMN
|
|
|
|
&& command.type != AlterCommand::Type::RENAME_COLUMN)
|
2016-05-13 21:08:19 +00:00
|
|
|
|
2019-12-26 18:17:05 +00:00
|
|
|
throw Exception("Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
|
|
|
|
ErrorCodes::NOT_IMPLEMENTED);
|
2021-04-30 05:02:32 +00:00
|
|
|
if (command.type == AlterCommand::DROP_COLUMN && !command.clear)
|
2021-02-28 05:24:39 +00:00
|
|
|
{
|
2021-02-28 07:42:08 +00:00
|
|
|
const auto & deps_mv = name_deps[command.column_name];
|
2021-02-28 05:24:39 +00:00
|
|
|
if (!deps_mv.empty())
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
"Trying to ALTER DROP column " + backQuoteIfNeed(command.column_name) + " which is referenced by materialized view "
|
|
|
|
+ toString(deps_mv),
|
|
|
|
ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN);
|
|
|
|
}
|
|
|
|
}
|
2019-12-26 18:17:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
void StorageDistributed::alter(const AlterCommands & params, ContextPtr local_context, TableLockHolder &)
|
2013-09-23 12:01:19 +00:00
|
|
|
{
|
2019-12-10 20:47:05 +00:00
|
|
|
auto table_id = getStorageID();
|
2019-08-26 14:50:34 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
checkAlterIsPossible(params, local_context);
|
2020-06-09 17:28:29 +00:00
|
|
|
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
|
2021-04-10 23:33:54 +00:00
|
|
|
params.apply(new_metadata, local_context);
|
|
|
|
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
|
2020-06-15 16:55:33 +00:00
|
|
|
setInMemoryMetadata(new_metadata);
|
2013-09-23 12:01:19 +00:00
|
|
|
}
|
2014-02-04 15:44:15 +00:00
|
|
|
|
2017-03-09 03:34:09 +00:00
|
|
|
|
2017-06-06 17:06:14 +00:00
|
|
|
void StorageDistributed::startup()
|
|
|
|
{
|
2021-04-12 11:58:24 +00:00
|
|
|
if (remote_database.empty() && !remote_table_function_ptr && !getCluster()->maybeCrossReplication())
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
|
2020-02-21 13:44:44 +00:00
|
|
|
|
2020-07-23 14:10:48 +00:00
|
|
|
if (!storage_policy)
|
2020-01-20 17:54:52 +00:00
|
|
|
return;
|
|
|
|
|
2021-06-24 07:07:31 +00:00
|
|
|
const auto & disks = data_volume->getDisks();
|
2021-06-27 15:22:34 +00:00
|
|
|
|
|
|
|
/// Make initialization for large number of disks parallel.
|
2021-06-24 07:07:31 +00:00
|
|
|
ThreadPool pool(disks.size());
|
|
|
|
|
|
|
|
for (const DiskPtr & disk : disks)
|
|
|
|
{
|
|
|
|
pool.scheduleOrThrowOnError([&]()
|
|
|
|
{
|
|
|
|
createDirectoryMonitors(disk);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
pool.wait();
|
|
|
|
|
|
|
|
const auto & paths = getDataPaths();
|
|
|
|
std::vector<UInt64> last_increment(paths.size());
|
|
|
|
for (size_t i = 0; i < paths.size(); ++i)
|
|
|
|
{
|
|
|
|
pool.scheduleOrThrowOnError([&, i]()
|
|
|
|
{
|
|
|
|
last_increment[i] = getMaximumFileNumber(paths[i]);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
pool.wait();
|
2020-01-20 17:54:52 +00:00
|
|
|
|
2021-06-24 07:07:31 +00:00
|
|
|
for (const auto inc : last_increment)
|
2020-01-20 17:54:52 +00:00
|
|
|
{
|
|
|
|
if (inc > file_names_increment.value)
|
|
|
|
file_names_increment.value.store(inc);
|
|
|
|
}
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_DEBUG(log, "Auto-increment is {}", file_names_increment.value);
|
2017-06-06 17:06:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2014-08-13 11:26:13 +00:00
|
|
|
void StorageDistributed::shutdown()
|
|
|
|
{
|
2020-04-24 23:03:27 +00:00
|
|
|
monitors_blocker.cancelForever();
|
|
|
|
|
2020-04-24 23:03:26 +00:00
|
|
|
std::lock_guard lock(cluster_nodes_mutex);
|
Fix DROP TABLE for Distributed (racy with INSERT)
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
2020-10-26 20:01:06 +00:00
|
|
|
|
|
|
|
LOG_DEBUG(log, "Joining background threads for async INSERT");
|
2017-07-27 15:24:39 +00:00
|
|
|
cluster_nodes_data.clear();
|
Fix DROP TABLE for Distributed (racy with INSERT)
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
2020-10-26 20:01:06 +00:00
|
|
|
LOG_DEBUG(log, "Background threads for async INSERT joined");
|
2014-08-13 11:26:13 +00:00
|
|
|
}
|
2020-07-16 20:35:23 +00:00
|
|
|
void StorageDistributed::drop()
|
|
|
|
{
|
Fix DROP TABLE for Distributed (racy with INSERT)
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
2020-10-26 20:01:06 +00:00
|
|
|
// Some INSERT in-between shutdown() and drop() can call
|
|
|
|
// requireDirectoryMonitor() again, so call shutdown() to clear them, but
|
|
|
|
// when the drop() (this function) executed none of INSERT is allowed in
|
|
|
|
// parallel.
|
|
|
|
//
|
|
|
|
// And second time shutdown() should be fast, since none of
|
|
|
|
// DirectoryMonitor should do anything, because ActionBlocker is canceled
|
|
|
|
// (in shutdown()).
|
|
|
|
shutdown();
|
2020-07-16 20:35:23 +00:00
|
|
|
|
|
|
|
// Distributed table w/o sharding_key does not allows INSERTs
|
|
|
|
if (relative_data_path.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
|
|
|
|
|
2020-09-15 09:26:56 +00:00
|
|
|
auto disks = data_volume->getDisks();
|
2020-07-16 20:35:23 +00:00
|
|
|
for (const auto & disk : disks)
|
|
|
|
disk->removeRecursive(relative_data_path);
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removed");
|
|
|
|
}
|
2014-08-13 11:26:13 +00:00
|
|
|
|
2020-01-20 17:54:52 +00:00
|
|
|
Strings StorageDistributed::getDataPaths() const
|
|
|
|
{
|
|
|
|
Strings paths;
|
|
|
|
|
|
|
|
if (relative_data_path.empty())
|
|
|
|
return paths;
|
|
|
|
|
2020-09-15 09:26:56 +00:00
|
|
|
for (const DiskPtr & disk : data_volume->getDisks())
|
2020-01-20 17:54:52 +00:00
|
|
|
paths.push_back(disk->getPath() + relative_data_path);
|
|
|
|
|
|
|
|
return paths;
|
|
|
|
}
|
2017-03-09 03:34:09 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
|
2018-04-21 00:35:20 +00:00
|
|
|
{
|
2018-06-09 15:48:22 +00:00
|
|
|
std::lock_guard lock(cluster_nodes_mutex);
|
2018-04-21 00:35:20 +00:00
|
|
|
|
2020-07-16 20:35:23 +00:00
|
|
|
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on TRUNCATE TABLE");
|
|
|
|
|
2018-06-09 15:48:22 +00:00
|
|
|
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
|
2018-04-21 00:35:20 +00:00
|
|
|
{
|
2021-02-08 19:07:30 +00:00
|
|
|
it->second.directory_monitor->shutdownAndDropAllData();
|
2018-06-09 15:48:22 +00:00
|
|
|
it = cluster_nodes_data.erase(it);
|
2018-04-21 00:35:20 +00:00
|
|
|
}
|
2020-07-16 20:35:23 +00:00
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removed");
|
2018-04-21 00:35:20 +00:00
|
|
|
}
|
2017-03-09 03:34:09 +00:00
|
|
|
|
2020-03-29 07:43:40 +00:00
|
|
|
StoragePolicyPtr StorageDistributed::getStoragePolicy() const
|
|
|
|
{
|
2020-07-23 14:10:48 +00:00
|
|
|
return storage_policy;
|
2020-03-29 07:43:40 +00:00
|
|
|
}
|
|
|
|
|
2021-01-09 12:26:37 +00:00
|
|
|
void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
|
2014-08-13 11:26:13 +00:00
|
|
|
{
|
2021-01-09 12:26:37 +00:00
|
|
|
const std::string path(disk->getPath() + relative_data_path);
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::create_directories(path);
|
2014-08-14 11:50:36 +00:00
|
|
|
|
2019-07-31 22:37:41 +00:00
|
|
|
std::filesystem::directory_iterator begin(path);
|
|
|
|
std::filesystem::directory_iterator end;
|
2017-05-10 06:39:37 +00:00
|
|
|
for (auto it = begin; it != end; ++it)
|
2020-11-04 18:58:43 +00:00
|
|
|
{
|
|
|
|
const auto & dir_path = it->path();
|
|
|
|
if (std::filesystem::is_directory(dir_path))
|
|
|
|
{
|
|
|
|
const auto & tmp_path = dir_path / "tmp";
|
|
|
|
|
|
|
|
/// "tmp" created by DistributedBlockOutputStream
|
|
|
|
if (std::filesystem::is_directory(tmp_path) && std::filesystem::is_empty(tmp_path))
|
|
|
|
std::filesystem::remove(tmp_path);
|
|
|
|
|
|
|
|
if (std::filesystem::is_empty(dir_path))
|
|
|
|
{
|
2020-11-22 17:13:40 +00:00
|
|
|
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string());
|
2020-11-04 18:58:43 +00:00
|
|
|
/// Will be created by DistributedBlockOutputStream on demand.
|
|
|
|
std::filesystem::remove(dir_path);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-06-24 07:07:31 +00:00
|
|
|
requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true);
|
2020-11-04 18:58:43 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-08-13 11:26:13 +00:00
|
|
|
}
|
|
|
|
|
2017-03-09 03:34:09 +00:00
|
|
|
|
2021-06-24 07:07:31 +00:00
|
|
|
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup)
|
2017-07-27 15:24:39 +00:00
|
|
|
{
|
2021-01-09 12:26:37 +00:00
|
|
|
const std::string & disk_path = disk->getPath();
|
|
|
|
const std::string key(disk_path + name);
|
2017-07-27 15:24:39 +00:00
|
|
|
|
2021-06-24 07:07:31 +00:00
|
|
|
auto create_node_data = [&]()
|
2020-04-14 18:12:08 +00:00
|
|
|
{
|
2021-06-24 07:07:31 +00:00
|
|
|
ClusterNodeData data;
|
|
|
|
data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
|
|
|
|
data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
|
2021-01-09 12:26:37 +00:00
|
|
|
*this, disk, relative_data_path + name,
|
2021-06-24 07:07:31 +00:00
|
|
|
data.connection_pool,
|
2021-01-09 12:26:37 +00:00
|
|
|
monitors_blocker,
|
2021-04-10 23:33:54 +00:00
|
|
|
getContext()->getDistributedSchedulePool());
|
2021-06-24 07:07:31 +00:00
|
|
|
return data;
|
|
|
|
};
|
|
|
|
|
|
|
|
/// In case of startup the lock can be acquired later.
|
|
|
|
if (startup)
|
|
|
|
{
|
|
|
|
auto tmp_node_data = create_node_data();
|
|
|
|
std::lock_guard lock(cluster_nodes_mutex);
|
|
|
|
auto & node_data = cluster_nodes_data[key];
|
|
|
|
assert(!node_data.directory_monitor);
|
|
|
|
node_data = std::move(tmp_node_data);
|
|
|
|
return *node_data.directory_monitor;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cluster_nodes_mutex);
|
|
|
|
auto & node_data = cluster_nodes_data[key];
|
|
|
|
if (!node_data.directory_monitor)
|
|
|
|
{
|
|
|
|
node_data = create_node_data();
|
|
|
|
}
|
|
|
|
return *node_data.directory_monitor;
|
2020-04-14 18:12:08 +00:00
|
|
|
}
|
2014-08-19 08:04:13 +00:00
|
|
|
}
|
|
|
|
|
2020-06-03 23:50:47 +00:00
|
|
|
std::vector<StorageDistributedDirectoryMonitor::Status> StorageDistributed::getDirectoryMonitorsStatuses() const
|
2020-06-02 23:47:32 +00:00
|
|
|
{
|
2020-06-03 23:50:47 +00:00
|
|
|
std::vector<StorageDistributedDirectoryMonitor::Status> statuses;
|
|
|
|
std::lock_guard lock(cluster_nodes_mutex);
|
2020-06-06 15:57:52 +00:00
|
|
|
statuses.reserve(cluster_nodes_data.size());
|
2020-06-04 17:23:46 +00:00
|
|
|
for (const auto & node : cluster_nodes_data)
|
2020-06-03 23:50:47 +00:00
|
|
|
statuses.push_back(node.second.directory_monitor->getStatus());
|
|
|
|
return statuses;
|
2020-06-02 23:47:32 +00:00
|
|
|
}
|
|
|
|
|
2021-01-26 18:45:36 +00:00
|
|
|
std::optional<UInt64> StorageDistributed::totalBytes(const Settings &) const
|
|
|
|
{
|
|
|
|
UInt64 total_bytes = 0;
|
|
|
|
for (const auto & status : getDirectoryMonitorsStatuses())
|
|
|
|
total_bytes += status.bytes_count;
|
|
|
|
return total_bytes;
|
|
|
|
}
|
|
|
|
|
2015-09-18 13:36:10 +00:00
|
|
|
size_t StorageDistributed::getShardCount() const
|
|
|
|
{
|
2018-03-16 02:08:31 +00:00
|
|
|
return getCluster()->getShardCount();
|
2016-10-10 08:44:52 +00:00
|
|
|
}
|
|
|
|
|
2020-01-20 17:54:52 +00:00
|
|
|
ClusterPtr StorageDistributed::getCluster() const
|
2017-07-27 15:24:39 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
return owned_cluster ? owned_cluster : getContext()->getCluster(cluster_name);
|
2019-04-08 05:13:16 +00:00
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
ClusterPtr StorageDistributed::getOptimizedCluster(
|
|
|
|
ContextPtr local_context, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query_ptr) const
|
2020-03-24 07:51:54 +00:00
|
|
|
{
|
|
|
|
ClusterPtr cluster = getCluster();
|
2021-04-10 23:33:54 +00:00
|
|
|
const Settings & settings = local_context->getSettingsRef();
|
2020-03-24 07:51:54 +00:00
|
|
|
|
2020-10-16 21:58:06 +00:00
|
|
|
bool sharding_key_is_usable = settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic;
|
|
|
|
|
|
|
|
if (has_sharding_key && sharding_key_is_usable)
|
2020-03-24 07:51:54 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, metadata_snapshot, local_context);
|
2020-03-24 07:51:54 +00:00
|
|
|
if (optimized)
|
|
|
|
return optimized;
|
|
|
|
}
|
|
|
|
|
|
|
|
UInt64 force = settings.force_optimize_skip_unused_shards;
|
|
|
|
if (force)
|
|
|
|
{
|
2020-11-09 19:07:38 +00:00
|
|
|
WriteBufferFromOwnString exception_message;
|
2020-03-24 07:51:54 +00:00
|
|
|
if (!has_sharding_key)
|
|
|
|
exception_message << "No sharding key";
|
2020-10-16 21:58:06 +00:00
|
|
|
else if (!sharding_key_is_usable)
|
2020-06-16 18:49:04 +00:00
|
|
|
exception_message << "Sharding key is not deterministic";
|
2020-03-24 07:51:54 +00:00
|
|
|
else
|
|
|
|
exception_message << "Sharding key " << sharding_key_column_name << " is not used";
|
|
|
|
|
|
|
|
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS)
|
|
|
|
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
|
|
|
|
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key)
|
|
|
|
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
|
|
|
|
}
|
|
|
|
|
2021-03-30 05:02:26 +00:00
|
|
|
return {};
|
2020-03-24 07:51:54 +00:00
|
|
|
}
|
|
|
|
|
2020-05-01 08:31:05 +00:00
|
|
|
IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
|
2020-04-30 23:37:55 +00:00
|
|
|
{
|
|
|
|
const auto & slot_to_shard = cluster->getSlotToShard();
|
|
|
|
|
|
|
|
// If result.type is DataTypeLowCardinality, do shard according to its dictionaryType
|
|
|
|
#define CREATE_FOR_TYPE(TYPE) \
|
|
|
|
if (typeid_cast<const DataType##TYPE *>(result.type.get())) \
|
|
|
|
return createBlockSelector<TYPE>(*result.column, slot_to_shard); \
|
|
|
|
else if (auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(result.type.get())) \
|
|
|
|
if (typeid_cast<const DataType ## TYPE *>(type_low_cardinality->getDictionaryType().get())) \
|
|
|
|
return createBlockSelector<TYPE>(*result.column->convertToFullColumnIfLowCardinality(), slot_to_shard);
|
|
|
|
|
|
|
|
CREATE_FOR_TYPE(UInt8)
|
|
|
|
CREATE_FOR_TYPE(UInt16)
|
|
|
|
CREATE_FOR_TYPE(UInt32)
|
|
|
|
CREATE_FOR_TYPE(UInt64)
|
|
|
|
CREATE_FOR_TYPE(Int8)
|
|
|
|
CREATE_FOR_TYPE(Int16)
|
|
|
|
CREATE_FOR_TYPE(Int32)
|
|
|
|
CREATE_FOR_TYPE(Int64)
|
|
|
|
|
|
|
|
#undef CREATE_FOR_TYPE
|
|
|
|
|
|
|
|
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
|
|
|
|
}
|
|
|
|
|
2018-12-19 12:38:13 +00:00
|
|
|
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
|
2019-08-19 20:28:24 +00:00
|
|
|
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
|
2020-06-17 16:39:58 +00:00
|
|
|
ClusterPtr StorageDistributed::skipUnusedShards(
|
|
|
|
ClusterPtr cluster,
|
|
|
|
const ASTPtr & query_ptr,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr local_context) const
|
2018-12-19 12:38:13 +00:00
|
|
|
{
|
2020-03-24 07:51:54 +00:00
|
|
|
const auto & select = query_ptr->as<ASTSelectQuery &>();
|
2018-12-19 12:38:13 +00:00
|
|
|
|
2019-08-19 20:28:24 +00:00
|
|
|
if (!select.prewhere() && !select.where())
|
|
|
|
{
|
2018-12-19 12:38:13 +00:00
|
|
|
return nullptr;
|
2019-08-19 20:28:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ASTPtr condition_ast;
|
|
|
|
if (select.prewhere() && select.where())
|
|
|
|
{
|
|
|
|
condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
|
|
|
|
}
|
2018-12-19 12:38:13 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
replaceConstantExpressions(condition_ast, local_context, metadata_snapshot->getColumns().getAll(), shared_from_this(), metadata_snapshot);
|
2018-12-19 12:38:13 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
size_t limit = local_context->getSettingsRef().optimize_skip_unused_shards_limit;
|
2021-03-08 07:05:56 +00:00
|
|
|
if (!limit || limit > SSIZE_MAX)
|
|
|
|
{
|
2021-06-28 18:55:30 +00:00
|
|
|
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "optimize_skip_unused_shards_limit out of range (0, {}]", SSIZE_MAX);
|
2021-03-08 07:05:56 +00:00
|
|
|
}
|
|
|
|
// To interpret limit==0 as limit is reached
|
|
|
|
++limit;
|
|
|
|
const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr, limit);
|
|
|
|
|
|
|
|
if (!limit)
|
2018-12-19 12:38:13 +00:00
|
|
|
{
|
2021-04-23 17:51:47 +00:00
|
|
|
LOG_DEBUG(log,
|
2021-03-08 07:05:56 +00:00
|
|
|
"Number of values for sharding key exceeds optimize_skip_unused_shards_limit={}, "
|
|
|
|
"try to increase it, but note that this may increase query processing time.",
|
2021-04-10 23:33:54 +00:00
|
|
|
local_context->getSettingsRef().optimize_skip_unused_shards_limit);
|
2018-12-19 12:38:13 +00:00
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Can't get definite answer if we can skip any shards
|
|
|
|
if (!blocks)
|
|
|
|
{
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::set<int> shards;
|
|
|
|
|
|
|
|
for (const auto & block : *blocks)
|
|
|
|
{
|
|
|
|
if (!block.has(sharding_key_column_name))
|
|
|
|
throw Exception("sharding_key_expr should evaluate as a single row", ErrorCodes::TOO_MANY_ROWS);
|
|
|
|
|
2020-03-18 03:27:32 +00:00
|
|
|
const ColumnWithTypeAndName & result = block.getByName(sharding_key_column_name);
|
2018-12-19 12:38:13 +00:00
|
|
|
const auto selector = createSelector(cluster, result);
|
|
|
|
|
|
|
|
shards.insert(selector.begin(), selector.end());
|
|
|
|
}
|
|
|
|
|
|
|
|
return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()});
|
|
|
|
}
|
|
|
|
|
2019-04-08 05:13:16 +00:00
|
|
|
ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
|
|
|
|
{
|
2019-04-22 15:11:16 +00:00
|
|
|
if (type == ActionLocks::DistributedSend)
|
2019-04-08 05:13:16 +00:00
|
|
|
return monitors_blocker.cancel();
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
|
2019-04-08 05:13:16 +00:00
|
|
|
{
|
2021-02-10 20:07:28 +00:00
|
|
|
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE
|
2021-04-10 23:33:54 +00:00
|
|
|
auto table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
|
2021-02-10 20:07:28 +00:00
|
|
|
|
2021-02-08 19:07:30 +00:00
|
|
|
std::vector<std::shared_ptr<StorageDistributedDirectoryMonitor>> directory_monitors;
|
|
|
|
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cluster_nodes_mutex);
|
|
|
|
|
|
|
|
directory_monitors.reserve(cluster_nodes_data.size());
|
|
|
|
for (auto & node : cluster_nodes_data)
|
|
|
|
directory_monitors.push_back(node.second.directory_monitor);
|
|
|
|
}
|
2019-04-08 05:13:16 +00:00
|
|
|
|
|
|
|
/// TODO: Maybe it should be executed in parallel
|
2021-02-08 19:07:30 +00:00
|
|
|
for (auto & node : directory_monitors)
|
|
|
|
node->flushAllData();
|
2019-04-08 05:13:16 +00:00
|
|
|
}
|
|
|
|
|
2020-04-07 14:05:51 +00:00
|
|
|
void StorageDistributed::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
|
2019-12-19 19:39:49 +00:00
|
|
|
{
|
2020-09-18 19:25:56 +00:00
|
|
|
assert(relative_data_path != new_path_to_table_data);
|
|
|
|
if (!relative_data_path.empty())
|
2020-01-20 17:54:52 +00:00
|
|
|
renameOnDisk(new_path_to_table_data);
|
2020-04-07 14:05:51 +00:00
|
|
|
renameInMemory(new_table_id);
|
2020-01-20 17:54:52 +00:00
|
|
|
}
|
2020-07-23 14:10:48 +00:00
|
|
|
|
|
|
|
|
2020-12-23 16:04:05 +00:00
|
|
|
size_t StorageDistributed::getRandomShardIndex(const Cluster::ShardsInfo & shards)
|
|
|
|
{
|
|
|
|
|
|
|
|
UInt32 total_weight = 0;
|
|
|
|
for (const auto & shard : shards)
|
|
|
|
total_weight += shard.weight;
|
|
|
|
|
|
|
|
assert(total_weight > 0);
|
|
|
|
|
|
|
|
size_t res;
|
|
|
|
{
|
|
|
|
std::lock_guard lock(rng_mutex);
|
|
|
|
res = std::uniform_int_distribution<size_t>(0, total_weight - 1)(rng);
|
|
|
|
}
|
|
|
|
|
|
|
|
for (auto i = 0ul, s = shards.size(); i < s; ++i)
|
|
|
|
{
|
|
|
|
if (shards[i].weight > res)
|
|
|
|
return i;
|
|
|
|
res -= shards[i].weight;
|
|
|
|
}
|
|
|
|
|
|
|
|
__builtin_unreachable();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-01-20 17:54:52 +00:00
|
|
|
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
|
|
|
|
{
|
2020-09-15 09:26:56 +00:00
|
|
|
for (const DiskPtr & disk : data_volume->getDisks())
|
2019-12-19 19:39:49 +00:00
|
|
|
{
|
2021-06-24 10:00:33 +00:00
|
|
|
disk->createDirectories(new_path_to_table_data);
|
2020-09-18 19:08:53 +00:00
|
|
|
disk->moveDirectory(relative_data_path, new_path_to_table_data);
|
2020-01-20 17:54:52 +00:00
|
|
|
|
2020-09-18 19:08:53 +00:00
|
|
|
auto new_path = disk->getPath() + new_path_to_table_data;
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_DEBUG(log, "Updating path to {}", new_path);
|
2020-01-20 17:54:52 +00:00
|
|
|
|
2019-12-19 19:39:49 +00:00
|
|
|
std::lock_guard lock(cluster_nodes_mutex);
|
|
|
|
for (auto & node : cluster_nodes_data)
|
2021-01-09 12:26:37 +00:00
|
|
|
node.second.directory_monitor->updatePath(new_path_to_table_data);
|
2019-12-19 19:39:49 +00:00
|
|
|
}
|
2020-01-20 17:54:52 +00:00
|
|
|
|
|
|
|
relative_data_path = new_path_to_table_data;
|
2019-12-19 19:39:49 +00:00
|
|
|
}
|
|
|
|
|
2021-01-27 18:43:41 +00:00
|
|
|
void StorageDistributed::delayInsertOrThrowIfNeeded() const
|
2021-01-26 18:45:37 +00:00
|
|
|
{
|
2021-01-27 18:43:41 +00:00
|
|
|
if (!distributed_settings.bytes_to_throw_insert &&
|
|
|
|
!distributed_settings.bytes_to_delay_insert)
|
2021-01-26 18:45:37 +00:00
|
|
|
return;
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
UInt64 total_bytes = *totalBytes(getContext()->getSettingsRef());
|
2021-01-27 18:43:41 +00:00
|
|
|
|
|
|
|
if (distributed_settings.bytes_to_throw_insert && total_bytes > distributed_settings.bytes_to_throw_insert)
|
2021-01-26 18:45:37 +00:00
|
|
|
{
|
2021-01-27 18:43:41 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
|
2021-01-26 18:45:37 +00:00
|
|
|
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
|
|
|
|
"Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})",
|
|
|
|
formatReadableSizeWithBinarySuffix(total_bytes),
|
|
|
|
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert));
|
|
|
|
}
|
2021-01-27 18:43:41 +00:00
|
|
|
|
|
|
|
if (distributed_settings.bytes_to_delay_insert && total_bytes > distributed_settings.bytes_to_delay_insert)
|
|
|
|
{
|
|
|
|
/// Step is 5% of the delay and minimal one second.
|
|
|
|
/// NOTE: max_delay_to_insert is in seconds, and step is in ms.
|
|
|
|
const size_t step_ms = std::min<double>(1., double(distributed_settings.max_delay_to_insert) * 1'000 * 0.05);
|
|
|
|
UInt64 delayed_ms = 0;
|
|
|
|
|
|
|
|
do {
|
|
|
|
delayed_ms += step_ms;
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(step_ms));
|
2021-04-10 23:33:54 +00:00
|
|
|
} while (*totalBytes(getContext()->getSettingsRef()) > distributed_settings.bytes_to_delay_insert && delayed_ms < distributed_settings.max_delay_to_insert*1000);
|
2021-01-27 18:43:41 +00:00
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::DistributedDelayedInserts);
|
|
|
|
ProfileEvents::increment(ProfileEvents::DistributedDelayedInsertsMilliseconds, delayed_ms);
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
UInt64 new_total_bytes = *totalBytes(getContext()->getSettingsRef());
|
2021-01-27 18:43:41 +00:00
|
|
|
LOG_INFO(log, "Too many bytes pending for async INSERT: was {}, now {}, INSERT was delayed to {} ms",
|
|
|
|
formatReadableSizeWithBinarySuffix(total_bytes),
|
|
|
|
formatReadableSizeWithBinarySuffix(new_total_bytes),
|
|
|
|
delayed_ms);
|
|
|
|
|
|
|
|
if (new_total_bytes > distributed_settings.bytes_to_delay_insert)
|
|
|
|
{
|
|
|
|
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
|
|
|
|
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
|
|
|
|
"Too many bytes pending for async INSERT: {} (bytes_to_delay_insert={})",
|
|
|
|
formatReadableSizeWithBinarySuffix(new_total_bytes),
|
|
|
|
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_delay_insert));
|
|
|
|
}
|
|
|
|
}
|
2021-01-26 18:45:37 +00:00
|
|
|
}
|
2017-12-30 00:36:06 +00:00
|
|
|
|
|
|
|
void registerStorageDistributed(StorageFactory & factory)
|
|
|
|
{
|
|
|
|
factory.registerStorage("Distributed", [](const StorageFactory::Arguments & args)
|
|
|
|
{
|
|
|
|
/** Arguments of engine is following:
|
|
|
|
* - name of cluster in configuration;
|
|
|
|
* - name of remote database;
|
|
|
|
* - name of remote table;
|
2020-01-20 17:54:52 +00:00
|
|
|
* - policy to store data in;
|
2017-12-30 00:36:06 +00:00
|
|
|
*
|
|
|
|
* Remote database may be specified in following form:
|
|
|
|
* - identifier;
|
|
|
|
* - constant expression with string result, like currentDatabase();
|
|
|
|
* -- string literal as specific case;
|
|
|
|
* - empty string means 'use default database from cluster'.
|
2021-01-07 14:14:41 +00:00
|
|
|
*
|
|
|
|
* Distributed engine also supports SETTINGS clause.
|
2017-12-30 00:36:06 +00:00
|
|
|
*/
|
|
|
|
|
|
|
|
ASTs & engine_args = args.engine_args;
|
|
|
|
|
2020-01-20 17:54:52 +00:00
|
|
|
if (engine_args.size() < 3 || engine_args.size() > 5)
|
|
|
|
throw Exception(
|
|
|
|
"Storage Distributed requires from 3 to 5 parameters - "
|
|
|
|
"name of configuration section with list of remote servers, "
|
|
|
|
"name of remote database, "
|
|
|
|
"name of remote table, "
|
|
|
|
"sharding key expression (optional), "
|
|
|
|
"policy to store data in (optional).",
|
|
|
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2020-02-21 13:44:44 +00:00
|
|
|
String cluster_name = getClusterNameAndMakeLiteral(engine_args[0]);
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2021-07-15 06:26:10 +00:00
|
|
|
const ContextPtr & context = args.getContext();
|
|
|
|
const ContextPtr & local_context = args.getLocalContext();
|
|
|
|
|
|
|
|
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], local_context);
|
|
|
|
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], local_context);
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2019-03-15 17:09:14 +00:00
|
|
|
String remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
|
|
|
String remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2020-01-20 17:54:52 +00:00
|
|
|
const auto & sharding_key = engine_args.size() >= 4 ? engine_args[3] : nullptr;
|
2020-07-23 12:46:06 +00:00
|
|
|
const auto & storage_policy = engine_args.size() >= 5 ? engine_args[4]->as<ASTLiteral &>().value.safeGet<String>() : "default";
|
2017-12-30 00:36:06 +00:00
|
|
|
|
|
|
|
/// Check that sharding_key exists in the table and has numeric type.
|
|
|
|
if (sharding_key)
|
|
|
|
{
|
2021-07-15 06:26:10 +00:00
|
|
|
auto sharding_expr = buildShardingKeyExpression(sharding_key, context, args.columns.getAllPhysical(), true);
|
2017-12-30 00:36:06 +00:00
|
|
|
const Block & block = sharding_expr->getSampleBlock();
|
|
|
|
|
|
|
|
if (block.columns() != 1)
|
|
|
|
throw Exception("Sharding expression must return exactly one column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
|
|
|
|
|
|
|
|
auto type = block.getByPosition(0).type;
|
|
|
|
|
|
|
|
if (!type->isValueRepresentedByInteger())
|
|
|
|
throw Exception("Sharding expression has type " + type->getName() +
|
|
|
|
", but should be one of integer type", ErrorCodes::TYPE_MISMATCH);
|
|
|
|
}
|
|
|
|
|
2021-01-07 14:14:41 +00:00
|
|
|
/// TODO: move some arguments from the arguments to the SETTINGS.
|
|
|
|
DistributedSettings distributed_settings;
|
|
|
|
if (args.storage_def->settings)
|
|
|
|
{
|
|
|
|
distributed_settings.loadFromQuery(*args.storage_def);
|
|
|
|
}
|
|
|
|
|
2021-01-27 18:43:41 +00:00
|
|
|
if (distributed_settings.max_delay_to_insert < 1)
|
|
|
|
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
|
|
|
|
"max_delay_to_insert cannot be less then 1");
|
|
|
|
|
|
|
|
if (distributed_settings.bytes_to_throw_insert && distributed_settings.bytes_to_delay_insert &&
|
|
|
|
distributed_settings.bytes_to_throw_insert <= distributed_settings.bytes_to_delay_insert)
|
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
|
|
|
|
"bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)");
|
|
|
|
}
|
|
|
|
|
2021-07-15 06:26:10 +00:00
|
|
|
/// Set default values from the distributed_directory_monitor_* global context settings.
|
|
|
|
if (!distributed_settings.monitor_batch_inserts.changed)
|
|
|
|
distributed_settings.monitor_batch_inserts = context->getSettingsRef().distributed_directory_monitor_batch_inserts;
|
|
|
|
if (!distributed_settings.monitor_split_batch_on_failure.changed)
|
|
|
|
distributed_settings.monitor_split_batch_on_failure = context->getSettingsRef().distributed_directory_monitor_split_batch_on_failure;
|
|
|
|
if (!distributed_settings.monitor_sleep_time_ms.changed)
|
|
|
|
distributed_settings.monitor_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_sleep_time_ms);
|
|
|
|
if (!distributed_settings.monitor_max_sleep_time_ms.changed)
|
|
|
|
distributed_settings.monitor_max_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms);
|
|
|
|
|
2017-12-30 00:36:06 +00:00
|
|
|
return StorageDistributed::create(
|
2021-04-23 12:18:23 +00:00
|
|
|
args.table_id,
|
|
|
|
args.columns,
|
|
|
|
args.constraints,
|
|
|
|
args.comment,
|
|
|
|
remote_database,
|
|
|
|
remote_table,
|
|
|
|
cluster_name,
|
2021-07-15 06:26:10 +00:00
|
|
|
context,
|
2020-01-20 17:54:52 +00:00
|
|
|
sharding_key,
|
|
|
|
storage_policy,
|
|
|
|
args.relative_data_path,
|
2021-01-07 14:14:41 +00:00
|
|
|
distributed_settings,
|
2018-03-16 02:08:31 +00:00
|
|
|
args.attach);
|
2020-04-06 05:19:40 +00:00
|
|
|
},
|
|
|
|
{
|
2021-01-07 14:14:41 +00:00
|
|
|
.supports_settings = true,
|
2021-01-08 11:42:17 +00:00
|
|
|
.supports_parallel_insert = true,
|
2020-04-06 05:19:40 +00:00
|
|
|
.source_access_type = AccessType::REMOTE,
|
2017-12-30 00:36:06 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2012-05-21 20:38:34 +00:00
|
|
|
}
|