2018-12-19 12:38:13 +00:00
# include <Storages/StorageDistributed.h>
2017-04-01 09:19:00 +00:00
# include <Databases/IDatabase.h>
2020-05-04 20:15:38 +00:00
# include <Disks/StoragePolicy.h>
2020-01-20 17:54:52 +00:00
# include <Disks/DiskLocal.h>
2017-04-01 09:19:00 +00:00
2018-07-05 20:38:05 +00:00
# include <DataTypes/DataTypeFactory.h>
2018-12-19 12:38:13 +00:00
# include <DataTypes/DataTypesNumber.h>
2018-07-05 20:38:05 +00:00
2018-12-19 12:38:13 +00:00
# include <Storages/Distributed/DistributedBlockOutputStream.h>
2017-12-30 00:36:06 +00:00
# include <Storages/StorageFactory.h>
2018-12-25 23:14:39 +00:00
# include <Storages/AlterCommands.h>
2017-04-01 09:19:00 +00:00
2020-04-10 09:24:16 +00:00
# include <Columns/ColumnConst.h>
2018-01-22 15:56:30 +00:00
# include <Common/Macros.h>
2017-04-01 09:19:00 +00:00
# include <Common/escapeForFileName.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2020-04-16 21:54:43 +00:00
# include <Common/quoteString.h>
2017-04-01 09:19:00 +00:00
2018-12-19 12:38:13 +00:00
# include <Parsers/ASTDropQuery.h>
# include <Parsers/ASTExpressionList.h>
# include <Parsers/ASTIdentifier.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTInsertQuery.h>
2018-12-19 12:38:13 +00:00
# include <Parsers/ASTLiteral.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTSelectQuery.h>
2018-12-19 12:38:13 +00:00
# include <Parsers/ASTTablesInSelectQuery.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ParserAlterQuery.h>
2018-12-19 12:38:13 +00:00
# include <Parsers/TablePropertiesQueriesASTs.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/parseQuery.h>
2018-12-19 12:38:13 +00:00
# include <Interpreters/ClusterProxy/SelectStreamFactory.h>
# include <Interpreters/ClusterProxy/executeQuery.h>
2020-06-13 16:31:28 +00:00
# include <Interpreters/Cluster.h>
2018-12-19 12:38:13 +00:00
# include <Interpreters/ExpressionAnalyzer.h>
2017-04-01 09:19:00 +00:00
# include <Interpreters/InterpreterAlterQuery.h>
# include <Interpreters/InterpreterDescribeQuery.h>
2018-12-19 12:38:13 +00:00
# include <Interpreters/InterpreterSelectQuery.h>
2019-07-26 17:43:42 +00:00
# include <Interpreters/TranslateQualifiedNamesVisitor.h>
2020-07-22 17:13:05 +00:00
# include <Interpreters/TreeRewriter.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2018-12-19 12:38:13 +00:00
# include <Interpreters/createBlockSelector.h>
2017-12-30 00:36:06 +00:00
# include <Interpreters/evaluateConstantExpression.h>
# include <Interpreters/getClusterName.h>
2020-04-01 14:21:37 +00:00
# include <Interpreters/getTableExpressions.h>
2020-06-16 18:49:04 +00:00
# include <Functions/IFunction.h>
2017-04-01 09:19:00 +00:00
# include <Core/Field.h>
2020-06-13 16:31:28 +00:00
# include <Core/Settings.h>
2012-05-21 20:38:34 +00:00
2018-06-05 19:46:49 +00:00
# include <IO/ReadHelpers.h>
2020-11-09 19:07:38 +00:00
# include <IO/WriteBufferFromString.h>
# include <IO/Operators.h>
2018-06-05 19:46:49 +00:00
2016-12-12 03:33:34 +00:00
# include <Poco/DirectoryIterator.h>
2015-02-10 21:10:58 +00:00
# include <memory>
2019-07-31 22:37:41 +00:00
# include <filesystem>
2020-04-22 21:44:22 +00:00
# include <optional>
2020-09-18 19:25:56 +00:00
# include <cassert>
2017-05-10 06:39:37 +00:00
2016-12-12 03:33:34 +00:00
2020-01-23 17:48:05 +00:00
namespace
{
2020-03-09 01:03:43 +00:00
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY = 1 ;
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2 ;
2020-08-15 13:25:30 +00:00
const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2 ;
2020-01-23 17:48:05 +00:00
}
2012-05-21 20:38:34 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED ;
2017-04-01 07:20:54 +00:00
extern const int STORAGE_REQUIRES_PARAMETER ;
2017-11-03 19:53:10 +00:00
extern const int BAD_ARGUMENTS ;
2017-12-30 00:36:06 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
extern const int INCORRECT_NUMBER_OF_COLUMNS ;
2018-03-16 02:08:31 +00:00
extern const int INFINITE_LOOP ;
2018-06-05 19:46:49 +00:00
extern const int TYPE_MISMATCH ;
2018-12-19 12:38:13 +00:00
extern const int TOO_MANY_ROWS ;
2020-01-23 17:48:05 +00:00
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS ;
2016-01-11 21:46:36 +00:00
}
2019-04-08 05:13:16 +00:00
namespace ActionLocks
{
2019-04-22 15:11:16 +00:00
extern const StorageActionBlockType DistributedSend ;
2019-04-08 05:13:16 +00:00
}
2016-01-11 21:46:36 +00:00
2014-08-21 12:07:29 +00:00
namespace
{
2017-04-01 07:20:54 +00:00
2018-07-25 12:31:47 +00:00
/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
2018-07-24 13:10:34 +00:00
ASTPtr rewriteSelectQuery ( const ASTPtr & query , const std : : string & database , const std : : string & table , ASTPtr table_function_ptr = nullptr )
2017-05-10 06:39:37 +00:00
{
2018-02-25 00:50:53 +00:00
auto modified_query_ast = query - > clone ( ) ;
2019-07-26 17:43:42 +00:00
ASTSelectQuery & select_query = modified_query_ast - > as < ASTSelectQuery & > ( ) ;
2020-09-16 09:57:26 +00:00
// Get rid of the settings clause so we don't send them to remote. Thus newly non-important
// settings won't break any remote parser. It's also more reasonable since the query settings
// are written into the query context and will be sent by the query pipeline.
select_query . setExpression ( ASTSelectQuery : : Expression : : SETTINGS , { } ) ;
2020-04-01 14:21:37 +00:00
if ( table_function_ptr )
select_query . addTableFunction ( table_function_ptr ) ;
else
select_query . replaceDatabaseAndTable ( database , table ) ;
2019-07-26 17:43:42 +00:00
2020-04-01 14:21:37 +00:00
/// Restore long column names (cause our short names are ambiguous).
/// TODO: aliased table functions & CREATE TABLE AS table function cases
if ( ! table_function_ptr )
2019-07-26 17:43:42 +00:00
{
RestoreQualifiedNamesVisitor : : Data data ;
2020-04-01 14:21:37 +00:00
data . distributed_table = DatabaseAndTableWithAlias ( * getTableExpression ( query - > as < ASTSelectQuery & > ( ) , 0 ) ) ;
data . remote_table . database = database ;
data . remote_table . table = table ;
data . rename = true ;
RestoreQualifiedNamesVisitor ( data ) . visit ( modified_query_ast ) ;
2019-07-26 17:43:42 +00:00
}
2017-05-10 06:39:37 +00:00
return modified_query_ast ;
}
2018-11-23 17:39:16 +00:00
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
/// the sample block instead.
2019-05-31 18:31:09 +00:00
ASTPtr createInsertToRemoteTableQuery ( const std : : string & database , const std : : string & table , const Block & sample_block_non_materialized )
2017-05-10 06:39:37 +00:00
{
2018-11-23 17:39:16 +00:00
auto query = std : : make_shared < ASTInsertQuery > ( ) ;
2020-03-02 20:23:58 +00:00
query - > table_id = StorageID ( database , table ) ;
2017-05-10 06:39:37 +00:00
2019-05-31 18:31:09 +00:00
auto columns = std : : make_shared < ASTExpressionList > ( ) ;
query - > columns = columns ;
query - > children . push_back ( columns ) ;
for ( const auto & col : sample_block_non_materialized )
columns - > children . push_back ( std : : make_shared < ASTIdentifier > ( col . name ) ) ;
2017-05-10 06:39:37 +00:00
2018-11-23 17:39:16 +00:00
return query ;
2017-05-10 06:39:37 +00:00
}
/// Calculate maximum number in file names in directory and all subdirectories.
/// To ensure global order of data blocks yet to be sent across server restarts.
2019-01-04 12:10:00 +00:00
UInt64 getMaximumFileNumber ( const std : : string & dir_path )
2017-05-10 06:39:37 +00:00
{
UInt64 res = 0 ;
2019-07-31 22:37:41 +00:00
std : : filesystem : : recursive_directory_iterator begin ( dir_path ) ;
std : : filesystem : : recursive_directory_iterator end ;
2017-05-10 06:39:37 +00:00
for ( auto it = begin ; it ! = end ; + + it )
2017-04-01 07:20:54 +00:00
{
2019-01-04 12:10:00 +00:00
const auto & file_path = it - > path ( ) ;
2017-04-01 07:20:54 +00:00
2019-07-31 22:37:41 +00:00
if ( ! std : : filesystem : : is_regular_file ( * it ) | | ! endsWith ( file_path . filename ( ) . string ( ) , " .bin " ) )
2017-05-10 06:39:37 +00:00
continue ;
2017-04-01 07:20:54 +00:00
2017-05-10 06:39:37 +00:00
UInt64 num = 0 ;
try
{
2019-01-04 12:10:00 +00:00
num = parse < UInt64 > ( file_path . filename ( ) . stem ( ) . string ( ) ) ;
2017-05-10 06:39:37 +00:00
}
catch ( Exception & e )
{
2019-01-04 12:10:00 +00:00
e . addMessage ( " Unexpected file name " + file_path . filename ( ) . string ( ) + " found at " + file_path . parent_path ( ) . string ( ) + " , should have numeric base name. " ) ;
2017-05-10 06:39:37 +00:00
throw ;
}
if ( num > res )
res = num ;
2017-04-01 07:20:54 +00:00
}
2017-05-10 06:39:37 +00:00
return res ;
}
2019-08-19 20:28:24 +00:00
std : : string makeFormattedListOfShards ( const ClusterPtr & cluster )
{
2020-11-09 19:07:38 +00:00
WriteBufferFromOwnString buf ;
2019-08-19 20:28:24 +00:00
bool head = true ;
2020-11-09 19:07:38 +00:00
buf < < " [ " ;
2019-08-19 20:28:24 +00:00
for ( const auto & shard_info : cluster - > getShardsInfo ( ) )
{
2020-11-09 19:07:38 +00:00
( head ? buf : buf < < " , " ) < < shard_info . shard_num ;
2019-08-19 20:28:24 +00:00
head = false ;
}
2020-11-09 19:07:38 +00:00
buf < < " ] " ;
2019-08-19 20:28:24 +00:00
2020-11-09 19:07:38 +00:00
return buf . str ( ) ;
2019-08-19 20:28:24 +00:00
}
2020-03-22 10:37:35 +00:00
ExpressionActionsPtr buildShardingKeyExpression ( const ASTPtr & sharding_key , const Context & context , const NamesAndTypesList & columns , bool project )
{
ASTPtr query = sharding_key ;
2020-07-22 17:13:05 +00:00
auto syntax_result = TreeRewriter ( context ) . analyze ( query , columns ) ;
2020-03-22 10:37:35 +00:00
return ExpressionAnalyzer ( query , syntax_result , context ) . getActions ( project ) ;
2014-08-13 12:52:30 +00:00
}
2020-06-16 18:49:04 +00:00
bool isExpressionActionsDeterministics ( const ExpressionActionsPtr & actions )
{
for ( const auto & action : actions - > getActions ( ) )
{
2020-11-10 14:54:59 +00:00
if ( action . node - > type ! = ActionsDAG : : ActionType : : FUNCTION )
2020-06-16 18:49:04 +00:00
continue ;
2020-11-03 11:28:28 +00:00
if ( ! action . node - > function_base - > isDeterministic ( ) )
2020-06-16 18:49:04 +00:00
return false ;
}
return true ;
}
2020-03-22 10:37:35 +00:00
class ReplacingConstantExpressionsMatcher
{
public :
using Data = Block ;
2014-08-13 12:52:30 +00:00
2020-03-22 10:37:35 +00:00
static bool needChildVisit ( ASTPtr & , const ASTPtr & )
{
return true ;
}
static void visit ( ASTPtr & node , Block & block_with_constants )
{
if ( ! node - > as < ASTFunction > ( ) )
return ;
2017-06-06 18:48:38 +00:00
2020-03-22 10:37:35 +00:00
std : : string name = node - > getColumnName ( ) ;
if ( block_with_constants . has ( name ) )
{
auto result = block_with_constants . getByName ( name ) ;
if ( ! isColumnConst ( * result . column ) )
return ;
2020-03-23 17:28:38 +00:00
node = std : : make_shared < ASTLiteral > ( assert_cast < const ColumnConst & > ( * result . column ) . getField ( ) ) ;
2020-03-22 10:37:35 +00:00
}
}
} ;
2020-03-23 17:28:38 +00:00
2020-06-18 09:08:24 +00:00
void replaceConstantExpressions (
ASTPtr & node ,
const Context & context ,
const NamesAndTypesList & columns ,
ConstStoragePtr storage ,
const StorageMetadataPtr & metadata_snapshot )
2018-11-08 15:43:14 +00:00
{
2020-07-22 17:13:05 +00:00
auto syntax_result = TreeRewriter ( context ) . analyze ( node , columns , storage , metadata_snapshot ) ;
2020-03-22 10:37:35 +00:00
Block block_with_constants = KeyCondition : : getBlockWithConstants ( node , syntax_result , context ) ;
InDepthNodeVisitor < ReplacingConstantExpressionsMatcher , true > visitor ( block_with_constants ) ;
visitor . visit ( node ) ;
2018-11-08 15:43:14 +00:00
}
2017-06-06 18:48:38 +00:00
2020-04-22 21:44:22 +00:00
/// Returns one of the following:
/// - QueryProcessingStage::Complete
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - none (in this case regular WithMergeableState should be used)
std : : optional < QueryProcessingStage : : Enum > getOptimizedQueryProcessingStage ( const ASTPtr & query_ptr , bool extremes , const Block & sharding_key_block )
2020-04-01 18:38:01 +00:00
{
2020-04-22 21:44:22 +00:00
const auto & select = query_ptr - > as < ASTSelectQuery & > ( ) ;
auto sharding_block_has = [ & ] ( const auto & exprs , size_t limit = SIZE_MAX ) - > bool
{
size_t i = 0 ;
for ( auto & expr : exprs )
{
2020-09-03 01:39:36 +00:00
+ + i ;
if ( i > limit )
2020-04-22 21:44:22 +00:00
break ;
auto id = expr - > template as < ASTIdentifier > ( ) ;
if ( ! id )
return false ;
/// TODO: if GROUP BY contains multiIf()/if() it should contain only columns from sharding_key
2020-10-24 18:46:10 +00:00
if ( ! sharding_key_block . has ( id - > name ( ) ) )
2020-04-22 21:44:22 +00:00
return false ;
}
return true ;
} ;
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if ( select . group_by_with_totals | | select . group_by_with_rollup | | select . group_by_with_cube )
return { } ;
// TODO: extremes support can be implemented
if ( extremes )
return { } ;
// DISTINCT
if ( select . distinct )
{
if ( ! sharding_block_has ( select . select ( ) - > children ) )
return { } ;
}
2020-04-01 18:38:01 +00:00
2020-04-22 21:44:22 +00:00
// GROUP BY
const ASTPtr group_by = select . groupBy ( ) ;
if ( ! group_by )
{
if ( ! select . distinct )
return { } ;
}
else
{
if ( ! sharding_block_has ( group_by - > children , 1 ) )
return { } ;
}
// ORDER BY
const ASTPtr order_by = select . orderBy ( ) ;
if ( order_by )
return QueryProcessingStage : : WithMergeableStateAfterAggregation ;
// LIMIT BY
// LIMIT
2020-12-02 17:11:39 +00:00
// OFFSET
if ( select . limitBy ( ) | | select . limitLength ( ) | | select . limitOffset ( ) )
2020-04-22 21:44:22 +00:00
return QueryProcessingStage : : WithMergeableStateAfterAggregation ;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage : : Complete ;
}
size_t getClusterQueriedNodes ( const Settings & settings , const ClusterPtr & cluster )
{
2020-04-01 18:38:01 +00:00
size_t num_local_shards = cluster - > getLocalShardCount ( ) ;
size_t num_remote_shards = cluster - > getRemoteShardCount ( ) ;
2020-04-22 21:44:22 +00:00
return ( num_remote_shards * settings . max_parallel_replicas ) + num_local_shards ;
2020-04-01 18:38:01 +00:00
}
2020-03-26 18:25:26 +00:00
}
2020-03-22 10:37:35 +00:00
/// For destruction of std::unique_ptr of type that is incomplete in class definition.
StorageDistributed : : ~ StorageDistributed ( ) = default ;
2020-01-20 17:54:52 +00:00
2020-04-24 09:20:09 +00:00
2020-04-28 10:38:57 +00:00
NamesAndTypesList StorageDistributed : : getVirtuals ( ) const
2020-04-27 13:55:30 +00:00
{
/// NOTE This is weird. Most of these virtual columns are part of MergeTree
/// tables info. But Distributed is general-purpose engine.
2020-04-28 10:38:57 +00:00
return NamesAndTypesList {
2020-04-27 13:55:30 +00:00
NameAndTypePair ( " _table " , std : : make_shared < DataTypeString > ( ) ) ,
NameAndTypePair ( " _part " , std : : make_shared < DataTypeString > ( ) ) ,
NameAndTypePair ( " _part_index " , std : : make_shared < DataTypeUInt64 > ( ) ) ,
NameAndTypePair ( " _partition_id " , std : : make_shared < DataTypeString > ( ) ) ,
NameAndTypePair ( " _sample_factor " , std : : make_shared < DataTypeFloat64 > ( ) ) ,
NameAndTypePair ( " _shard_num " , std : : make_shared < DataTypeUInt32 > ( ) ) ,
} ;
}
2020-04-24 09:20:09 +00:00
2014-09-30 03:08:47 +00:00
StorageDistributed : : StorageDistributed (
2019-12-04 16:06:55 +00:00
const StorageID & id_ ,
2018-03-06 20:18:34 +00:00
const ColumnsDescription & columns_ ,
2019-08-24 21:20:20 +00:00
const ConstraintsDescription & constraints_ ,
2017-04-01 07:20:54 +00:00
const String & remote_database_ ,
const String & remote_table_ ,
const String & cluster_name_ ,
2017-05-23 18:37:14 +00:00
const Context & context_ ,
2017-04-01 07:20:54 +00:00
const ASTPtr & sharding_key_ ,
2020-07-23 14:10:48 +00:00
const String & storage_policy_name_ ,
2019-10-25 19:07:47 +00:00
const String & relative_data_path_ ,
2020-10-14 12:19:29 +00:00
bool attach_ ,
ClusterPtr owned_cluster_ )
2020-04-27 13:55:30 +00:00
: IStorage ( id_ )
2019-12-04 16:06:55 +00:00
, remote_database ( remote_database_ )
, remote_table ( remote_table_ )
2020-11-06 14:07:56 +00:00
, global_context ( context_ . getGlobalContext ( ) )
2020-05-30 21:57:37 +00:00
, log ( & Poco : : Logger : : get ( " StorageDistributed ( " + id_ . table_name + " ) " ) )
2020-10-14 12:19:29 +00:00
, owned_cluster ( std : : move ( owned_cluster_ ) )
2020-11-06 14:07:56 +00:00
, cluster_name ( global_context . getMacros ( ) - > expand ( cluster_name_ ) )
2019-12-04 16:06:55 +00:00
, has_sharding_key ( sharding_key_ )
2020-01-20 17:54:52 +00:00
, relative_data_path ( relative_data_path_ )
2014-09-30 03:08:47 +00:00
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata ;
storage_metadata . setColumns ( columns_ ) ;
storage_metadata . setConstraints ( constraints_ ) ;
setInMemoryMetadata ( storage_metadata ) ;
2019-08-24 21:20:20 +00:00
2019-08-26 13:46:07 +00:00
if ( sharding_key_ )
{
2020-11-06 14:07:56 +00:00
sharding_key_expr = buildShardingKeyExpression ( sharding_key_ , global_context , storage_metadata . getColumns ( ) . getAllPhysical ( ) , false ) ;
2019-08-26 13:46:07 +00:00
sharding_key_column_name = sharding_key_ - > getColumnName ( ) ;
2020-06-16 18:49:04 +00:00
sharding_key_is_deterministic = isExpressionActionsDeterministics ( sharding_key_expr ) ;
2019-08-26 13:46:07 +00:00
}
2020-01-20 17:54:52 +00:00
if ( ! relative_data_path . empty ( ) )
2020-07-23 14:10:48 +00:00
{
2020-11-06 14:07:56 +00:00
storage_policy = global_context . getStoragePolicy ( storage_policy_name_ ) ;
2020-09-15 09:26:56 +00:00
data_volume = storage_policy - > getVolume ( 0 ) ;
if ( storage_policy - > getVolumes ( ) . size ( ) > 1 )
LOG_WARNING ( log , " Storage policy for Distributed table has multiple volumes. "
" Only {} volume will be used to store data. Other will be ignored. " , data_volume - > getName ( ) ) ;
2020-07-23 14:10:48 +00:00
}
2020-01-20 17:54:52 +00:00
2018-03-16 02:08:31 +00:00
/// Sanity check. Skip check if the table is already created to allow the server to start.
2019-08-03 11:02:40 +00:00
if ( ! attach_ & & ! cluster_name . empty ( ) )
2018-03-16 02:08:31 +00:00
{
2020-11-06 14:07:56 +00:00
size_t num_local_shards = global_context . getCluster ( cluster_name ) - > getLocalShardCount ( ) ;
2019-12-04 16:06:55 +00:00
if ( num_local_shards & & remote_database = = id_ . database_name & & remote_table = = id_ . table_name )
throw Exception ( " Distributed table " + id_ . table_name + " looks at itself " , ErrorCodes : : INFINITE_LOOP ) ;
2018-03-16 02:08:31 +00:00
}
2014-09-30 03:08:47 +00:00
}
2017-03-09 03:34:09 +00:00
2018-07-24 13:10:34 +00:00
StorageDistributed : : StorageDistributed (
2019-12-04 16:06:55 +00:00
const StorageID & id_ ,
2018-03-12 13:47:01 +00:00
const ColumnsDescription & columns_ ,
2019-08-24 21:20:20 +00:00
const ConstraintsDescription & constraints_ ,
2018-07-24 13:10:34 +00:00
ASTPtr remote_table_function_ptr_ ,
const String & cluster_name_ ,
const Context & context_ ,
const ASTPtr & sharding_key_ ,
2020-07-23 14:10:48 +00:00
const String & storage_policy_name_ ,
2019-10-25 19:07:47 +00:00
const String & relative_data_path_ ,
2020-10-14 12:19:29 +00:00
bool attach ,
ClusterPtr owned_cluster_ )
: StorageDistributed ( id_ , columns_ , constraints_ , String { } , String { } , cluster_name_ , context_ , sharding_key_ , storage_policy_name_ , relative_data_path_ , attach , std : : move ( owned_cluster_ ) )
2018-07-24 13:10:34 +00:00
{
2019-12-04 16:06:55 +00:00
remote_table_function_ptr = std : : move ( remote_table_function_ptr_ ) ;
2018-07-24 13:10:34 +00:00
}
2020-11-07 21:30:40 +00:00
QueryProcessingStage : : Enum StorageDistributed : : getQueryProcessingStage (
const Context & context , QueryProcessingStage : : Enum to_stage , SelectQueryInfo & query_info ) const
2020-04-16 21:54:43 +00:00
{
2020-04-30 23:47:19 +00:00
const auto & settings = context . getSettingsRef ( ) ;
2020-06-17 16:39:58 +00:00
auto metadata_snapshot = getInMemoryMetadataPtr ( ) ;
2020-04-30 23:47:19 +00:00
2020-09-10 19:55:36 +00:00
ClusterPtr cluster = getCluster ( ) ;
query_info . cluster = cluster ;
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
if ( settings . optimize_skip_unused_shards )
{
ClusterPtr optimized_cluster = getOptimizedCluster ( context , metadata_snapshot , query_info . query ) ;
if ( optimized_cluster )
{
LOG_DEBUG ( log , " Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {} " , makeFormattedListOfShards ( optimized_cluster ) ) ;
cluster = optimized_cluster ;
query_info . cluster = cluster ;
}
else
{
LOG_DEBUG ( log , " Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{} " , has_sharding_key ? " " : " (no sharding key) " ) ;
}
}
2020-04-22 21:44:22 +00:00
if ( settings . distributed_group_by_no_merge )
2020-08-15 13:25:30 +00:00
{
if ( settings . distributed_group_by_no_merge = = DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION )
return QueryProcessingStage : : WithMergeableStateAfterAggregation ;
else
return QueryProcessingStage : : Complete ;
}
2020-04-16 21:54:43 +00:00
2020-04-22 21:44:22 +00:00
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if ( to_stage = = QueryProcessingStage : : WithMergeableState )
return QueryProcessingStage : : WithMergeableState ;
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if ( getClusterQueriedNodes ( settings , cluster ) = = 1 )
return QueryProcessingStage : : Complete ;
if ( settings . optimize_skip_unused_shards & &
settings . optimize_distributed_group_by_sharding_key & &
has_sharding_key & &
2020-10-16 21:58:06 +00:00
( settings . allow_nondeterministic_optimize_skip_unused_shards | | sharding_key_is_deterministic ) )
2020-04-22 21:44:22 +00:00
{
Block sharding_key_block = sharding_key_expr - > getSampleBlock ( ) ;
2020-09-10 19:55:36 +00:00
auto stage = getOptimizedQueryProcessingStage ( query_info . query , settings . extremes , sharding_key_block ) ;
2020-04-22 21:44:22 +00:00
if ( stage )
{
LOG_DEBUG ( log , " Force processing stage to {} " , QueryProcessingStage : : toString ( * stage ) ) ;
return * stage ;
}
}
return QueryProcessingStage : : WithMergeableState ;
2020-03-18 00:57:00 +00:00
}
2020-08-03 13:54:14 +00:00
Pipe StorageDistributed : : read (
2020-09-25 13:19:26 +00:00
const Names & column_names ,
const StorageMetadataPtr & metadata_snapshot ,
2020-11-10 12:02:22 +00:00
SelectQueryInfo & query_info ,
2020-09-25 13:19:26 +00:00
const Context & context ,
QueryProcessingStage : : Enum processed_stage ,
const size_t max_block_size ,
const unsigned num_streams )
{
QueryPlan plan ;
read ( plan , column_names , metadata_snapshot , query_info , context , processed_stage , max_block_size , num_streams ) ;
2020-10-07 11:26:29 +00:00
return plan . convertToPipe ( ) ;
2020-09-25 13:19:26 +00:00
}
void StorageDistributed : : read (
2020-09-18 14:16:53 +00:00
QueryPlan & query_plan ,
2019-09-18 21:17:00 +00:00
const Names & column_names ,
2020-06-17 14:37:21 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-09-20 17:52:17 +00:00
SelectQueryInfo & query_info ,
2018-04-19 14:47:09 +00:00
const Context & context ,
QueryProcessingStage : : Enum processed_stage ,
const size_t /*max_block_size*/ ,
const unsigned /*num_streams*/ )
{
2017-04-01 07:20:54 +00:00
const auto & modified_query_ast = rewriteSelectQuery (
2018-07-24 13:10:34 +00:00
query_info . query , remote_database , remote_table , remote_table_function_ptr ) ;
2015-02-10 20:48:17 +00:00
2019-08-09 13:37:42 +00:00
Block header =
InterpreterSelectQuery ( query_info . query , context , SelectQueryOptions ( processed_stage ) ) . getSampleBlock ( ) ;
2018-02-15 18:54:12 +00:00
2019-10-19 20:36:35 +00:00
const Scalars & scalars = context . hasQueryContext ( ) ? context . getQueryContext ( ) . getScalars ( ) : Scalars { } ;
2019-09-18 21:17:00 +00:00
bool has_virtual_shard_num_column = std : : find ( column_names . begin ( ) , column_names . end ( ) , " _shard_num " ) ! = column_names . end ( ) ;
2020-06-17 14:37:21 +00:00
if ( has_virtual_shard_num_column & & ! isVirtualColumn ( " _shard_num " , metadata_snapshot ) )
2019-09-18 21:17:00 +00:00
has_virtual_shard_num_column = false ;
2018-09-10 03:59:48 +00:00
ClusterProxy : : SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy : : SelectStreamFactory (
2019-09-18 21:17:00 +00:00
header , processed_stage , remote_table_function_ptr , scalars , has_virtual_shard_num_column , context . getExternalTables ( ) )
2018-07-25 12:31:47 +00:00
: ClusterProxy : : SelectStreamFactory (
2020-03-04 20:29:52 +00:00
header , processed_stage , StorageID { remote_database , remote_table } , scalars , has_virtual_shard_num_column , context . getExternalTables ( ) ) ;
2015-11-06 17:44:01 +00:00
2020-11-10 12:02:22 +00:00
ClusterProxy : : executeQuery ( query_plan , select_stream_factory , log ,
modified_query_ast , context , query_info ) ;
2012-05-21 20:38:34 +00:00
}
2017-03-09 03:34:09 +00:00
2020-06-16 12:48:10 +00:00
BlockOutputStreamPtr StorageDistributed : : write ( const ASTPtr & , const StorageMetadataPtr & metadata_snapshot , const Context & context )
2014-08-12 13:46:46 +00:00
{
2018-03-16 02:08:31 +00:00
auto cluster = getCluster ( ) ;
2019-02-27 18:26:24 +00:00
const auto & settings = context . getSettingsRef ( ) ;
2016-10-10 08:44:52 +00:00
2018-01-25 12:18:27 +00:00
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
2020-07-23 14:10:48 +00:00
if ( ! storage_policy & & ! owned_cluster & & ! settings . insert_distributed_sync )
2018-01-25 12:18:27 +00:00
{
throw Exception ( " Storage " + getName ( ) + " must has own data directory to enable asynchronous inserts " ,
ErrorCodes : : BAD_ARGUMENTS ) ;
}
2016-10-10 08:44:52 +00:00
2018-01-25 12:18:27 +00:00
/// If sharding key is not specified, then you can only write to a shard containing only one shard
if ( ! has_sharding_key & & ( ( cluster - > getLocalShardCount ( ) + cluster - > getRemoteShardCount ( ) ) > = 2 ) )
{
throw Exception ( " Method write is not supported by storage " + getName ( ) + " with more than one shard and no sharding key provided " ,
ErrorCodes : : STORAGE_REQUIRES_PARAMETER ) ;
}
2014-08-21 12:07:29 +00:00
2018-01-25 12:18:27 +00:00
/// Force sync insertion if it is remote() table function
2017-12-11 18:09:20 +00:00
bool insert_sync = settings . insert_distributed_sync | | owned_cluster ;
2017-11-02 14:01:11 +00:00
auto timeout = settings . insert_distributed_timeout ;
2017-04-01 07:20:54 +00:00
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std : : make_shared < DistributedBlockOutputStream > (
2020-06-16 15:51:29 +00:00
context , * this , metadata_snapshot , createInsertToRemoteTableQuery ( remote_database , remote_table , metadata_snapshot - > getSampleBlockNonMaterialized ( ) ) , cluster ,
2019-04-08 10:04:26 +00:00
insert_sync , timeout ) ;
2014-08-12 13:46:46 +00:00
}
2017-03-09 03:34:09 +00:00
2020-06-10 11:16:31 +00:00
void StorageDistributed : : checkAlterIsPossible ( const AlterCommands & commands , const Settings & /* settings */ ) const
2013-09-23 12:01:19 +00:00
{
2019-12-26 18:17:05 +00:00
for ( const auto & command : commands )
{
if ( command . type ! = AlterCommand : : Type : : ADD_COLUMN
& & command . type ! = AlterCommand : : Type : : MODIFY_COLUMN
& & command . type ! = AlterCommand : : Type : : DROP_COLUMN
2020-05-07 12:54:35 +00:00
& & command . type ! = AlterCommand : : Type : : COMMENT_COLUMN
& & command . type ! = AlterCommand : : Type : : RENAME_COLUMN )
2016-05-13 21:08:19 +00:00
2019-12-26 18:17:05 +00:00
throw Exception ( " Alter of type ' " + alterTypeToString ( command . type ) + " ' is not supported by storage " + getName ( ) ,
ErrorCodes : : NOT_IMPLEMENTED ) ;
}
}
2020-06-18 16:10:47 +00:00
void StorageDistributed : : alter ( const AlterCommands & params , const Context & context , TableLockHolder & )
2013-09-23 12:01:19 +00:00
{
2019-12-10 20:47:05 +00:00
auto table_id = getStorageID ( ) ;
2019-08-26 14:50:34 +00:00
2019-12-26 18:17:05 +00:00
checkAlterIsPossible ( params , context . getSettingsRef ( ) ) ;
2020-06-09 17:28:29 +00:00
StorageInMemoryMetadata new_metadata = getInMemoryMetadata ( ) ;
params . apply ( new_metadata , context ) ;
DatabaseCatalog : : instance ( ) . getDatabase ( table_id . database_name ) - > alterTable ( context , table_id , new_metadata ) ;
2020-06-15 16:55:33 +00:00
setInMemoryMetadata ( new_metadata ) ;
2013-09-23 12:01:19 +00:00
}
2014-02-04 15:44:15 +00:00
2017-03-09 03:34:09 +00:00
2017-06-06 17:06:14 +00:00
void StorageDistributed : : startup ( )
{
2020-02-21 13:44:44 +00:00
if ( remote_database . empty ( ) & & ! remote_table_function_ptr )
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Name of remote database is empty. Default database will be used implicitly. " ) ;
2020-02-21 13:44:44 +00:00
2020-07-23 14:10:48 +00:00
if ( ! storage_policy )
2020-01-20 17:54:52 +00:00
return ;
2020-09-15 09:26:56 +00:00
for ( const DiskPtr & disk : data_volume - > getDisks ( ) )
2020-01-20 17:54:52 +00:00
createDirectoryMonitors ( disk - > getPath ( ) ) ;
for ( const String & path : getDataPaths ( ) )
{
UInt64 inc = getMaximumFileNumber ( path ) ;
if ( inc > file_names_increment . value )
file_names_increment . value . store ( inc ) ;
}
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Auto-increment is {} " , file_names_increment . value ) ;
2017-06-06 17:06:14 +00:00
}
2014-08-13 11:26:13 +00:00
void StorageDistributed : : shutdown ( )
{
2020-04-24 23:03:27 +00:00
monitors_blocker . cancelForever ( ) ;
2020-04-24 23:03:26 +00:00
std : : lock_guard lock ( cluster_nodes_mutex ) ;
Fix DROP TABLE for Distributed (racy with INSERT)
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
2020-10-26 20:01:06 +00:00
LOG_DEBUG ( log , " Joining background threads for async INSERT " ) ;
2017-07-27 15:24:39 +00:00
cluster_nodes_data . clear ( ) ;
Fix DROP TABLE for Distributed (racy with INSERT)
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
2020-10-26 20:01:06 +00:00
LOG_DEBUG ( log , " Background threads for async INSERT joined " ) ;
2014-08-13 11:26:13 +00:00
}
2020-07-16 20:35:23 +00:00
void StorageDistributed : : drop ( )
{
Fix DROP TABLE for Distributed (racy with INSERT)
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
2020-10-26 20:01:06 +00:00
// Some INSERT in-between shutdown() and drop() can call
// requireDirectoryMonitor() again, so call shutdown() to clear them, but
// when the drop() (this function) executed none of INSERT is allowed in
// parallel.
//
// And second time shutdown() should be fast, since none of
// DirectoryMonitor should do anything, because ActionBlocker is canceled
// (in shutdown()).
shutdown ( ) ;
2020-07-16 20:35:23 +00:00
// Distributed table w/o sharding_key does not allows INSERTs
if ( relative_data_path . empty ( ) )
return ;
LOG_DEBUG ( log , " Removing pending blocks for async INSERT from filesystem on DROP TABLE " ) ;
2020-09-15 09:26:56 +00:00
auto disks = data_volume - > getDisks ( ) ;
2020-07-16 20:35:23 +00:00
for ( const auto & disk : disks )
disk - > removeRecursive ( relative_data_path ) ;
LOG_DEBUG ( log , " Removed " ) ;
}
2014-08-13 11:26:13 +00:00
2020-01-20 17:54:52 +00:00
Strings StorageDistributed : : getDataPaths ( ) const
{
Strings paths ;
if ( relative_data_path . empty ( ) )
return paths ;
2020-09-15 09:26:56 +00:00
for ( const DiskPtr & disk : data_volume - > getDisks ( ) )
2020-01-20 17:54:52 +00:00
paths . push_back ( disk - > getPath ( ) + relative_data_path ) ;
return paths ;
}
2017-03-09 03:34:09 +00:00
2020-06-18 16:10:47 +00:00
void StorageDistributed : : truncate ( const ASTPtr & , const StorageMetadataPtr & , const Context & , TableExclusiveLockHolder & )
2018-04-21 00:35:20 +00:00
{
2018-06-09 15:48:22 +00:00
std : : lock_guard lock ( cluster_nodes_mutex ) ;
2018-04-21 00:35:20 +00:00
2020-07-16 20:35:23 +00:00
LOG_DEBUG ( log , " Removing pending blocks for async INSERT from filesystem on TRUNCATE TABLE " ) ;
2018-06-09 15:48:22 +00:00
for ( auto it = cluster_nodes_data . begin ( ) ; it ! = cluster_nodes_data . end ( ) ; )
2018-04-21 00:35:20 +00:00
{
2018-06-09 15:48:22 +00:00
it - > second . shutdownAndDropAllData ( ) ;
it = cluster_nodes_data . erase ( it ) ;
2018-04-21 00:35:20 +00:00
}
2020-07-16 20:35:23 +00:00
LOG_DEBUG ( log , " Removed " ) ;
2018-04-21 00:35:20 +00:00
}
2017-03-09 03:34:09 +00:00
2020-03-29 07:43:40 +00:00
StoragePolicyPtr StorageDistributed : : getStoragePolicy ( ) const
{
2020-07-23 14:10:48 +00:00
return storage_policy ;
2020-03-29 07:43:40 +00:00
}
2020-01-20 17:54:52 +00:00
void StorageDistributed : : createDirectoryMonitors ( const std : : string & disk )
2014-08-13 11:26:13 +00:00
{
2020-01-20 17:54:52 +00:00
const std : : string path ( disk + relative_data_path ) ;
2019-12-24 13:29:53 +00:00
Poco : : File { path } . createDirectories ( ) ;
2014-08-14 11:50:36 +00:00
2019-07-31 22:37:41 +00:00
std : : filesystem : : directory_iterator begin ( path ) ;
std : : filesystem : : directory_iterator end ;
2017-05-10 06:39:37 +00:00
for ( auto it = begin ; it ! = end ; + + it )
2020-11-04 18:58:43 +00:00
{
const auto & dir_path = it - > path ( ) ;
if ( std : : filesystem : : is_directory ( dir_path ) )
{
const auto & tmp_path = dir_path / " tmp " ;
/// "tmp" created by DistributedBlockOutputStream
if ( std : : filesystem : : is_directory ( tmp_path ) & & std : : filesystem : : is_empty ( tmp_path ) )
std : : filesystem : : remove ( tmp_path ) ;
if ( std : : filesystem : : is_empty ( dir_path ) )
{
2020-11-22 17:13:40 +00:00
LOG_DEBUG ( log , " Removing {} (used for async INSERT into Distributed) " , dir_path . string ( ) ) ;
2020-11-04 18:58:43 +00:00
/// Will be created by DistributedBlockOutputStream on demand.
std : : filesystem : : remove ( dir_path ) ;
}
else
{
requireDirectoryMonitor ( disk , dir_path . filename ( ) . string ( ) ) ;
}
}
}
2014-08-13 11:26:13 +00:00
}
2017-03-09 03:34:09 +00:00
2020-04-14 18:12:08 +00:00
StorageDistributedDirectoryMonitor & StorageDistributed : : requireDirectoryMonitor ( const std : : string & disk , const std : : string & name )
2017-07-27 15:24:39 +00:00
{
2020-01-20 17:54:52 +00:00
const std : : string path ( disk + relative_data_path + name ) ;
const std : : string key ( disk + name ) ;
2017-07-27 15:24:39 +00:00
2018-02-07 13:02:47 +00:00
std : : lock_guard lock ( cluster_nodes_mutex ) ;
2020-01-20 17:54:52 +00:00
auto & node_data = cluster_nodes_data [ key ] ;
2020-04-14 18:12:08 +00:00
if ( ! node_data . directory_monitor )
{
2020-06-03 00:10:39 +00:00
node_data . connection_pool = StorageDistributedDirectoryMonitor : : createPool ( name , * this ) ;
2020-04-14 18:12:08 +00:00
node_data . directory_monitor = std : : make_unique < StorageDistributedDirectoryMonitor > (
2020-11-06 14:07:56 +00:00
* this , path , node_data . connection_pool , monitors_blocker , global_context . getDistributedSchedulePool ( ) ) ;
2020-04-14 18:12:08 +00:00
}
return * node_data . directory_monitor ;
2014-08-19 08:04:13 +00:00
}
2020-06-03 23:50:47 +00:00
std : : vector < StorageDistributedDirectoryMonitor : : Status > StorageDistributed : : getDirectoryMonitorsStatuses ( ) const
2020-06-02 23:47:32 +00:00
{
2020-06-03 23:50:47 +00:00
std : : vector < StorageDistributedDirectoryMonitor : : Status > statuses ;
std : : lock_guard lock ( cluster_nodes_mutex ) ;
2020-06-06 15:57:52 +00:00
statuses . reserve ( cluster_nodes_data . size ( ) ) ;
2020-06-04 17:23:46 +00:00
for ( const auto & node : cluster_nodes_data )
2020-06-03 23:50:47 +00:00
statuses . push_back ( node . second . directory_monitor - > getStatus ( ) ) ;
return statuses ;
2020-06-02 23:47:32 +00:00
}
2015-09-18 13:36:10 +00:00
size_t StorageDistributed : : getShardCount ( ) const
{
2018-03-16 02:08:31 +00:00
return getCluster ( ) - > getShardCount ( ) ;
2016-10-10 08:44:52 +00:00
}
2020-01-20 17:54:52 +00:00
ClusterPtr StorageDistributed : : getCluster ( ) const
2017-07-27 15:24:39 +00:00
{
2020-11-06 14:07:56 +00:00
return owned_cluster ? owned_cluster : global_context . getCluster ( cluster_name ) ;
2019-04-08 05:13:16 +00:00
}
2020-06-17 16:39:58 +00:00
ClusterPtr StorageDistributed : : getOptimizedCluster ( const Context & context , const StorageMetadataPtr & metadata_snapshot , const ASTPtr & query_ptr ) const
2020-03-24 07:51:54 +00:00
{
ClusterPtr cluster = getCluster ( ) ;
const Settings & settings = context . getSettingsRef ( ) ;
2020-10-16 21:58:06 +00:00
bool sharding_key_is_usable = settings . allow_nondeterministic_optimize_skip_unused_shards | | sharding_key_is_deterministic ;
if ( has_sharding_key & & sharding_key_is_usable )
2020-03-24 07:51:54 +00:00
{
2020-06-17 16:39:58 +00:00
ClusterPtr optimized = skipUnusedShards ( cluster , query_ptr , metadata_snapshot , context ) ;
2020-03-24 07:51:54 +00:00
if ( optimized )
return optimized ;
}
UInt64 force = settings . force_optimize_skip_unused_shards ;
if ( force )
{
2020-11-09 19:07:38 +00:00
WriteBufferFromOwnString exception_message ;
2020-03-24 07:51:54 +00:00
if ( ! has_sharding_key )
exception_message < < " No sharding key " ;
2020-10-16 21:58:06 +00:00
else if ( ! sharding_key_is_usable )
2020-06-16 18:49:04 +00:00
exception_message < < " Sharding key is not deterministic " ;
2020-03-24 07:51:54 +00:00
else
exception_message < < " Sharding key " < < sharding_key_column_name < < " is not used " ;
if ( force = = FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS )
throw Exception ( exception_message . str ( ) , ErrorCodes : : UNABLE_TO_SKIP_UNUSED_SHARDS ) ;
if ( force = = FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY & & has_sharding_key )
throw Exception ( exception_message . str ( ) , ErrorCodes : : UNABLE_TO_SKIP_UNUSED_SHARDS ) ;
}
return cluster ;
}
2020-04-22 06:22:14 +00:00
void StorageDistributed : : ClusterNodeData : : flushAllData ( ) const
2019-04-08 05:13:16 +00:00
{
2019-05-10 04:19:02 +00:00
directory_monitor - > flushAllData ( ) ;
2017-07-27 15:24:39 +00:00
}
2020-04-22 06:22:14 +00:00
void StorageDistributed : : ClusterNodeData : : shutdownAndDropAllData ( ) const
2018-04-21 00:35:20 +00:00
{
directory_monitor - > shutdownAndDropAllData ( ) ;
}
2020-05-01 08:31:05 +00:00
IColumn : : Selector StorageDistributed : : createSelector ( const ClusterPtr cluster , const ColumnWithTypeAndName & result )
2020-04-30 23:37:55 +00:00
{
const auto & slot_to_shard = cluster - > getSlotToShard ( ) ;
// If result.type is DataTypeLowCardinality, do shard according to its dictionaryType
# define CREATE_FOR_TYPE(TYPE) \
if ( typeid_cast < const DataType # # TYPE * > ( result . type . get ( ) ) ) \
return createBlockSelector < TYPE > ( * result . column , slot_to_shard ) ; \
else if ( auto * type_low_cardinality = typeid_cast < const DataTypeLowCardinality * > ( result . type . get ( ) ) ) \
if ( typeid_cast < const DataType # # TYPE * > ( type_low_cardinality - > getDictionaryType ( ) . get ( ) ) ) \
return createBlockSelector < TYPE > ( * result . column - > convertToFullColumnIfLowCardinality ( ) , slot_to_shard ) ;
CREATE_FOR_TYPE ( UInt8 )
CREATE_FOR_TYPE ( UInt16 )
CREATE_FOR_TYPE ( UInt32 )
CREATE_FOR_TYPE ( UInt64 )
CREATE_FOR_TYPE ( Int8 )
CREATE_FOR_TYPE ( Int16 )
CREATE_FOR_TYPE ( Int32 )
CREATE_FOR_TYPE ( Int64 )
# undef CREATE_FOR_TYPE
throw Exception { " Sharding key expression does not evaluate to an integer type " , ErrorCodes : : TYPE_MISMATCH } ;
}
2018-12-19 12:38:13 +00:00
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
2019-08-19 20:28:24 +00:00
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
2020-06-17 16:39:58 +00:00
ClusterPtr StorageDistributed : : skipUnusedShards (
ClusterPtr cluster ,
const ASTPtr & query_ptr ,
const StorageMetadataPtr & metadata_snapshot ,
const Context & context ) const
2018-12-19 12:38:13 +00:00
{
2020-03-24 07:51:54 +00:00
const auto & select = query_ptr - > as < ASTSelectQuery & > ( ) ;
2018-12-19 12:38:13 +00:00
2019-08-19 20:28:24 +00:00
if ( ! select . prewhere ( ) & & ! select . where ( ) )
{
2018-12-19 12:38:13 +00:00
return nullptr ;
2019-08-19 20:28:24 +00:00
}
ASTPtr condition_ast ;
if ( select . prewhere ( ) & & select . where ( ) )
{
condition_ast = makeASTFunction ( " and " , select . prewhere ( ) - > clone ( ) , select . where ( ) - > clone ( ) ) ;
}
else
{
condition_ast = select . prewhere ( ) ? select . prewhere ( ) - > clone ( ) : select . where ( ) - > clone ( ) ;
}
2018-12-19 12:38:13 +00:00
2020-06-18 09:08:24 +00:00
replaceConstantExpressions ( condition_ast , context , metadata_snapshot - > getColumns ( ) . getAll ( ) , shared_from_this ( ) , metadata_snapshot ) ;
2019-08-19 20:28:24 +00:00
const auto blocks = evaluateExpressionOverConstantCondition ( condition_ast , sharding_key_expr ) ;
2018-12-19 12:38:13 +00:00
// Can't get definite answer if we can skip any shards
if ( ! blocks )
{
return nullptr ;
}
std : : set < int > shards ;
for ( const auto & block : * blocks )
{
if ( ! block . has ( sharding_key_column_name ) )
throw Exception ( " sharding_key_expr should evaluate as a single row " , ErrorCodes : : TOO_MANY_ROWS ) ;
2020-03-18 03:27:32 +00:00
const ColumnWithTypeAndName & result = block . getByName ( sharding_key_column_name ) ;
2018-12-19 12:38:13 +00:00
const auto selector = createSelector ( cluster , result ) ;
shards . insert ( selector . begin ( ) , selector . end ( ) ) ;
}
return cluster - > getClusterWithMultipleShards ( { shards . begin ( ) , shards . end ( ) } ) ;
}
2019-04-08 05:13:16 +00:00
ActionLock StorageDistributed : : getActionLock ( StorageActionBlockType type )
{
2019-04-22 15:11:16 +00:00
if ( type = = ActionLocks : : DistributedSend )
2019-04-08 05:13:16 +00:00
return monitors_blocker . cancel ( ) ;
return { } ;
}
2019-05-10 04:19:02 +00:00
void StorageDistributed : : flushClusterNodesAllData ( )
2019-04-08 05:13:16 +00:00
{
std : : lock_guard lock ( cluster_nodes_mutex ) ;
/// TODO: Maybe it should be executed in parallel
2020-03-09 01:03:43 +00:00
for ( auto & node : cluster_nodes_data )
node . second . flushAllData ( ) ;
2019-04-08 05:13:16 +00:00
}
2020-04-07 14:05:51 +00:00
void StorageDistributed : : rename ( const String & new_path_to_table_data , const StorageID & new_table_id )
2019-12-19 19:39:49 +00:00
{
2020-09-18 19:25:56 +00:00
assert ( relative_data_path ! = new_path_to_table_data ) ;
if ( ! relative_data_path . empty ( ) )
2020-01-20 17:54:52 +00:00
renameOnDisk ( new_path_to_table_data ) ;
2020-04-07 14:05:51 +00:00
renameInMemory ( new_table_id ) ;
2020-01-20 17:54:52 +00:00
}
2020-07-23 14:10:48 +00:00
2020-01-20 17:54:52 +00:00
void StorageDistributed : : renameOnDisk ( const String & new_path_to_table_data )
{
2020-09-15 09:26:56 +00:00
for ( const DiskPtr & disk : data_volume - > getDisks ( ) )
2019-12-19 19:39:49 +00:00
{
2020-09-18 19:08:53 +00:00
disk - > moveDirectory ( relative_data_path , new_path_to_table_data ) ;
2020-01-20 17:54:52 +00:00
2020-09-18 19:08:53 +00:00
auto new_path = disk - > getPath ( ) + new_path_to_table_data ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Updating path to {} " , new_path ) ;
2020-01-20 17:54:52 +00:00
2019-12-19 19:39:49 +00:00
std : : lock_guard lock ( cluster_nodes_mutex ) ;
for ( auto & node : cluster_nodes_data )
2020-01-20 17:54:52 +00:00
node . second . directory_monitor - > updatePath ( new_path ) ;
2019-12-19 19:39:49 +00:00
}
2020-01-20 17:54:52 +00:00
relative_data_path = new_path_to_table_data ;
2019-12-19 19:39:49 +00:00
}
2017-12-30 00:36:06 +00:00
void registerStorageDistributed ( StorageFactory & factory )
{
factory . registerStorage ( " Distributed " , [ ] ( const StorageFactory : : Arguments & args )
{
/** Arguments of engine is following:
* - name of cluster in configuration ;
* - name of remote database ;
* - name of remote table ;
2020-01-20 17:54:52 +00:00
* - policy to store data in ;
2017-12-30 00:36:06 +00:00
*
* Remote database may be specified in following form :
* - identifier ;
* - constant expression with string result , like currentDatabase ( ) ;
* - - string literal as specific case ;
* - empty string means ' use default database from cluster ' .
*/
ASTs & engine_args = args . engine_args ;
2020-01-20 17:54:52 +00:00
if ( engine_args . size ( ) < 3 | | engine_args . size ( ) > 5 )
throw Exception (
" Storage Distributed requires from 3 to 5 parameters - "
" name of configuration section with list of remote servers, "
" name of remote database, "
" name of remote table, "
" sharding key expression (optional), "
" policy to store data in (optional). " ,
ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2017-12-30 00:36:06 +00:00
2020-02-21 13:44:44 +00:00
String cluster_name = getClusterNameAndMakeLiteral ( engine_args [ 0 ] ) ;
2017-12-30 00:36:06 +00:00
engine_args [ 1 ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args [ 1 ] , args . local_context ) ;
engine_args [ 2 ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args [ 2 ] , args . local_context ) ;
2019-03-15 17:09:14 +00:00
String remote_database = engine_args [ 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
String remote_table = engine_args [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2017-12-30 00:36:06 +00:00
2020-01-20 17:54:52 +00:00
const auto & sharding_key = engine_args . size ( ) > = 4 ? engine_args [ 3 ] : nullptr ;
2020-07-23 12:46:06 +00:00
const auto & storage_policy = engine_args . size ( ) > = 5 ? engine_args [ 4 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) : " default " ;
2017-12-30 00:36:06 +00:00
/// Check that sharding_key exists in the table and has numeric type.
if ( sharding_key )
{
2018-11-08 15:43:14 +00:00
auto sharding_expr = buildShardingKeyExpression ( sharding_key , args . context , args . columns . getAllPhysical ( ) , true ) ;
2017-12-30 00:36:06 +00:00
const Block & block = sharding_expr - > getSampleBlock ( ) ;
if ( block . columns ( ) ! = 1 )
throw Exception ( " Sharding expression must return exactly one column " , ErrorCodes : : INCORRECT_NUMBER_OF_COLUMNS ) ;
auto type = block . getByPosition ( 0 ) . type ;
if ( ! type - > isValueRepresentedByInteger ( ) )
throw Exception ( " Sharding expression has type " + type - > getName ( ) +
" , but should be one of integer type " , ErrorCodes : : TYPE_MISMATCH ) ;
}
return StorageDistributed : : create (
2019-12-04 16:06:55 +00:00
args . table_id , args . columns , args . constraints ,
2017-12-30 00:36:06 +00:00
remote_database , remote_table , cluster_name ,
2020-01-20 17:54:52 +00:00
args . context ,
sharding_key ,
storage_policy ,
args . relative_data_path ,
2018-03-16 02:08:31 +00:00
args . attach ) ;
2020-04-06 05:19:40 +00:00
} ,
{
. source_access_type = AccessType : : REMOTE ,
2017-12-30 00:36:06 +00:00
} ) ;
}
2012-05-21 20:38:34 +00:00
}