2018-03-23 16:05:14 +00:00
# include <port/unistd.h>
2014-03-10 09:33:18 +00:00
# include <stdlib.h>
# include <fcntl.h>
# include <signal.h>
2016-01-12 02:55:39 +00:00
# include <time.h>
2014-03-10 09:33:18 +00:00
# include <iostream>
# include <fstream>
# include <iomanip>
2016-01-12 02:55:39 +00:00
# include <random>
2017-09-09 23:17:38 +00:00
# include <pcg_random.hpp>
2014-03-10 09:33:18 +00:00
# include <Poco/File.h>
# include <Poco/Util/Application.h>
2017-04-01 09:19:00 +00:00
# include <Common/Stopwatch.h>
2019-01-11 19:12:36 +00:00
# include <Common/ThreadPool.h>
2017-04-01 09:19:00 +00:00
# include <AggregateFunctions/ReservoirSampler.h>
2017-09-06 20:01:44 +00:00
# include <AggregateFunctions/registerAggregateFunctions.h>
2014-03-10 09:33:18 +00:00
# include <boost/program_options.hpp>
2017-04-01 09:19:00 +00:00
# include <Common/ConcurrentBoundedQueue.h>
# include <Common/Exception.h>
# include <Common/randomSeed.h>
# include <Core/Types.h>
# include <IO/ReadBufferFromFileDescriptor.h>
# include <IO/WriteBufferFromFileDescriptor.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <IO/Operators.h>
2017-12-27 17:58:52 +00:00
# include <IO/ConnectionTimeouts.h>
2018-09-06 18:05:33 +00:00
# include <IO/UseSSL.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/RemoteBlockInputStream.h>
# include <Interpreters/Context.h>
# include <Client/Connection.h>
2018-06-05 20:09:51 +00:00
# include <Common/InterruptListener.h>
2018-11-28 14:41:51 +00:00
# include <Common/Config/configReadClient.h>
2014-03-10 09:33:18 +00:00
2016-09-28 16:49:59 +00:00
/** A tool for evaluating ClickHouse performance.
* The tool emulates a case with fixed amount of simultaneously executing queries .
2014-03-10 09:33:18 +00:00
*/
namespace DB
{
2016-01-12 02:55:39 +00:00
namespace ErrorCodes
{
2017-04-08 01:32:05 +00:00
extern const int BAD_ARGUMENTS ;
2018-11-22 21:19:58 +00:00
extern const int EMPTY_DATA_PASSED ;
2016-01-12 02:55:39 +00:00
}
2018-11-28 14:41:51 +00:00
class Benchmark : public Poco : : Util : : Application
2014-03-10 09:33:18 +00:00
{
public :
2017-04-01 07:20:54 +00:00
Benchmark ( unsigned concurrency_ , double delay_ ,
2018-11-28 12:19:01 +00:00
const String & host_ , UInt16 port_ , bool secure_ , const String & default_database_ ,
2017-04-01 07:20:54 +00:00
const String & user_ , const String & password_ , const String & stage ,
bool randomize_ , size_t max_iterations_ , double max_time_ ,
2019-03-02 21:40:40 +00:00
const String & json_path_ , const Settings & settings_ )
2017-04-01 07:20:54 +00:00
:
concurrency ( concurrency_ ) , delay ( delay_ ) , queue ( concurrency ) ,
2019-03-02 21:40:40 +00:00
connections ( concurrency , host_ , port_ , default_database_ , user_ , password_ , " benchmark " , Protocol : : Compression : : Enable , secure_ ? Protocol : : Secure : : Enable : Protocol : : Secure : : Disable ) ,
2017-04-01 07:20:54 +00:00
randomize ( randomize_ ) , max_iterations ( max_iterations_ ) , max_time ( max_time_ ) ,
2017-06-19 20:31:23 +00:00
json_path ( json_path_ ) , settings ( settings_ ) , global_context ( Context : : createGlobal ( ) ) , pool ( concurrency )
2017-04-01 07:20:54 +00:00
{
2019-07-08 02:14:32 +00:00
global_context . makeGlobalContext ( ) ;
2017-04-01 07:20:54 +00:00
std : : cerr < < std : : fixed < < std : : setprecision ( 3 ) ;
2017-09-06 20:01:44 +00:00
/// This is needed to receive blocks with columns of AggregateFunction data type
/// (example: when using stage = 'with_mergeable_state')
registerAggregateFunctions ( ) ;
2017-04-01 07:20:54 +00:00
if ( stage = = " complete " )
query_processing_stage = QueryProcessingStage : : Complete ;
else if ( stage = = " fetch_columns " )
query_processing_stage = QueryProcessingStage : : FetchColumns ;
else if ( stage = = " with_mergeable_state " )
query_processing_stage = QueryProcessingStage : : WithMergeableState ;
else
throw Exception ( " Unknown query processing stage: " + stage , ErrorCodes : : BAD_ARGUMENTS ) ;
2018-11-28 14:41:51 +00:00
}
2018-11-29 07:58:59 +00:00
void initialize ( Poco : : Util : : Application & self [[maybe_unused]] )
2018-11-28 14:41:51 +00:00
{
std : : string home_path ;
const char * home_path_cstr = getenv ( " HOME " ) ;
if ( home_path_cstr )
home_path = home_path_cstr ;
configReadClient ( config ( ) , home_path ) ;
}
2018-11-28 14:50:02 +00:00
int main ( const std : : vector < std : : string > & )
2018-11-28 14:41:51 +00:00
{
2017-04-01 07:20:54 +00:00
if ( ! json_path . empty ( ) & & Poco : : File ( json_path ) . exists ( ) ) /// Clear file with previous results
Poco : : File ( json_path ) . remove ( ) ;
readQueries ( ) ;
2018-11-28 14:41:51 +00:00
runBenchmark ( ) ;
2018-11-28 14:50:02 +00:00
return 0 ;
2017-04-01 07:20:54 +00:00
}
2014-03-10 09:33:18 +00:00
private :
2017-04-01 07:20:54 +00:00
using Query = std : : string ;
unsigned concurrency ;
double delay ;
using Queries = std : : vector < Query > ;
Queries queries ;
using Queue = ConcurrentBoundedQueue < Query > ;
Queue queue ;
ConnectionPool connections ;
bool randomize ;
size_t max_iterations ;
double max_time ;
String json_path ;
Settings settings ;
2017-06-19 20:31:23 +00:00
Context global_context ;
2017-04-01 07:20:54 +00:00
QueryProcessingStage : : Enum query_processing_stage ;
/// Don't execute new queries after timelimit or SIGINT or exception
std : : atomic < bool > shutdown { false } ;
2018-10-11 18:12:28 +00:00
std : : atomic < size_t > queries_executed { 0 } ;
2017-04-01 07:20:54 +00:00
struct Stats
{
Stopwatch watch ;
std : : atomic < size_t > queries { 0 } ;
size_t read_rows = 0 ;
size_t read_bytes = 0 ;
size_t result_rows = 0 ;
size_t result_bytes = 0 ;
using Sampler = ReservoirSampler < double > ;
Sampler sampler { 1 < < 16 } ;
void add ( double seconds , size_t read_rows_inc , size_t read_bytes_inc , size_t result_rows_inc , size_t result_bytes_inc )
{
+ + queries ;
read_rows + = read_rows_inc ;
read_bytes + = read_bytes_inc ;
result_rows + = result_rows_inc ;
result_bytes + = result_bytes_inc ;
sampler . insert ( seconds ) ;
}
void clear ( )
{
watch . restart ( ) ;
queries = 0 ;
read_rows = 0 ;
read_bytes = 0 ;
result_rows = 0 ;
result_bytes = 0 ;
sampler . clear ( ) ;
}
} ;
Stats info_per_interval ;
Stats info_total ;
Stopwatch delay_watch ;
std : : mutex mutex ;
ThreadPool pool ;
void readQueries ( )
{
ReadBufferFromFileDescriptor in ( STDIN_FILENO ) ;
while ( ! in . eof ( ) )
{
std : : string query ;
readText ( query , in ) ;
assertChar ( ' \n ' , in ) ;
if ( ! query . empty ( ) )
queries . emplace_back ( query ) ;
}
if ( queries . empty ( ) )
2018-11-22 21:19:58 +00:00
throw Exception ( " Empty list of queries. " , ErrorCodes : : EMPTY_DATA_PASSED ) ;
2017-04-01 07:20:54 +00:00
std : : cerr < < " Loaded " < < queries . size ( ) < < " queries. \n " ;
}
void printNumberOfQueriesExecuted ( size_t num )
{
std : : cerr < < " \n Queries executed: " < < num ;
if ( queries . size ( ) > 1 )
std : : cerr < < " ( " < < ( num * 100.0 / queries . size ( ) ) < < " %) " ;
std : : cerr < < " . \n " ;
}
/// Try push new query and check cancellation conditions
bool tryPushQueryInteractively ( const String & query , InterruptListener & interrupt_listener )
{
bool inserted = false ;
while ( ! inserted )
{
inserted = queue . tryPush ( query , 100 ) ;
if ( shutdown )
{
/// An exception occurred in a worker
return false ;
}
if ( max_time > 0 & & info_total . watch . elapsedSeconds ( ) > = max_time )
{
std : : cout < < " Stopping launch of queries. Requested time limit is exhausted. \n " ;
return false ;
}
if ( interrupt_listener . check ( ) )
{
std : : cout < < " Stopping launch of queries. SIGINT recieved. \n " ;
return false ;
}
if ( delay > 0 & & delay_watch . elapsedSeconds ( ) > delay )
{
printNumberOfQueriesExecuted ( info_total . queries ) ;
report ( info_per_interval ) ;
delay_watch . restart ( ) ;
}
2019-01-04 14:22:34 +00:00
}
2017-04-01 07:20:54 +00:00
return true ;
}
2018-11-28 14:41:51 +00:00
void runBenchmark ( )
2017-04-01 07:20:54 +00:00
{
2017-09-09 23:17:38 +00:00
pcg64 generator ( randomSeed ( ) ) ;
2017-04-01 07:20:54 +00:00
std : : uniform_int_distribution < size_t > distribution ( 0 , queries . size ( ) - 1 ) ;
for ( size_t i = 0 ; i < concurrency ; + + i )
2019-03-02 21:40:40 +00:00
pool . schedule ( std : : bind ( & Benchmark : : thread , this ,
connections . get ( ConnectionTimeouts : : getTCPTimeoutsWithoutFailover ( settings ) ) ) ) ;
2017-04-01 07:20:54 +00:00
InterruptListener interrupt_listener ;
info_per_interval . watch . restart ( ) ;
delay_watch . restart ( ) ;
/// Push queries into queue
for ( size_t i = 0 ; ! max_iterations | | i < max_iterations ; + + i )
{
size_t query_index = randomize ? distribution ( generator ) : i % queries . size ( ) ;
if ( ! tryPushQueryInteractively ( queries [ query_index ] , interrupt_listener ) )
2018-10-11 18:25:05 +00:00
{
shutdown = true ;
2017-04-01 07:20:54 +00:00
break ;
2018-10-11 18:25:05 +00:00
}
2017-04-01 07:20:54 +00:00
}
pool . wait ( ) ;
info_total . watch . stop ( ) ;
if ( ! json_path . empty ( ) )
reportJSON ( info_total , json_path ) ;
printNumberOfQueriesExecuted ( info_total . queries ) ;
report ( info_total ) ;
}
void thread ( ConnectionPool : : Entry connection )
{
Query query ;
try
{
/// In these threads we do not accept INT signal.
sigset_t sig_set ;
if ( sigemptyset ( & sig_set )
| | sigaddset ( & sig_set , SIGINT )
| | pthread_sigmask ( SIG_BLOCK , & sig_set , nullptr ) )
throwFromErrno ( " Cannot block signal. " , ErrorCodes : : CANNOT_BLOCK_SIGNAL ) ;
while ( true )
{
bool extracted = false ;
while ( ! extracted )
{
extracted = queue . tryPop ( query , 100 ) ;
2018-10-11 18:12:28 +00:00
if ( shutdown | | ( max_iterations & & queries_executed = = max_iterations ) )
2017-04-01 07:20:54 +00:00
return ;
}
execute ( connection , query ) ;
2018-10-11 18:12:28 +00:00
+ + queries_executed ;
2017-04-01 07:20:54 +00:00
}
}
catch ( . . . )
{
shutdown = true ;
std : : cerr < < " An error occurred while processing query: \n " < < query < < " \n " ;
throw ;
}
}
void execute ( ConnectionPool : : Entry & connection , Query & query )
{
Stopwatch watch ;
2019-03-02 21:40:40 +00:00
RemoteBlockInputStream stream (
* connection ,
query , { } , global_context , & settings , nullptr , Tables ( ) , query_processing_stage ) ;
2014-03-10 09:33:18 +00:00
2017-04-01 07:20:54 +00:00
Progress progress ;
stream . setProgressCallback ( [ & progress ] ( const Progress & value ) { progress . incrementPiecewiseAtomically ( value ) ; } ) ;
stream . readPrefix ( ) ;
while ( Block block = stream . read ( ) )
;
stream . readSuffix ( ) ;
2014-03-10 09:33:18 +00:00
2017-04-01 07:20:54 +00:00
const BlockStreamProfileInfo & info = stream . getProfileInfo ( ) ;
2014-05-06 17:34:22 +00:00
2017-04-01 07:20:54 +00:00
double seconds = watch . elapsedSeconds ( ) ;
2014-03-10 09:33:18 +00:00
2019-01-04 12:10:00 +00:00
std : : lock_guard lock ( mutex ) ;
2019-05-20 11:37:41 +00:00
info_per_interval . add ( seconds , progress . read_rows , progress . read_bytes , info . rows , info . bytes ) ;
info_total . add ( seconds , progress . read_rows , progress . read_bytes , info . rows , info . bytes ) ;
2017-04-01 07:20:54 +00:00
}
2014-05-06 17:08:51 +00:00
2017-04-01 07:20:54 +00:00
void report ( Stats & info )
{
2019-01-04 12:10:00 +00:00
std : : lock_guard lock ( mutex ) ;
2014-03-10 09:33:18 +00:00
2017-04-01 07:20:54 +00:00
/// Avoid zeros, nans or exceptions
if ( 0 = = info . queries )
return ;
2017-02-01 20:30:46 +00:00
2017-04-01 07:20:54 +00:00
double seconds = info . watch . elapsedSeconds ( ) ;
2014-05-06 17:34:22 +00:00
2017-04-01 07:20:54 +00:00
std : : cerr
< < " \n "
< < " QPS: " < < ( info . queries / seconds ) < < " , "
< < " RPS: " < < ( info . read_rows / seconds ) < < " , "
< < " MiB/s: " < < ( info . read_bytes / seconds / 1048576 ) < < " , "
< < " result RPS: " < < ( info . result_rows / seconds ) < < " , "
< < " result MiB/s: " < < ( info . result_bytes / seconds / 1048576 ) < < " . "
< < " \n " ;
2014-03-10 09:33:18 +00:00
2017-04-01 07:20:54 +00:00
auto print_percentile = [ & ] ( double percent )
{
std : : cerr < < percent < < " % \t " < < info . sampler . quantileInterpolated ( percent / 100.0 ) < < " sec. " < < std : : endl ;
} ;
2014-04-06 23:18:07 +00:00
2017-04-01 07:20:54 +00:00
for ( int percent = 0 ; percent < = 90 ; percent + = 10 )
print_percentile ( percent ) ;
2016-09-30 12:39:18 +00:00
2017-04-01 07:20:54 +00:00
print_percentile ( 95 ) ;
print_percentile ( 99 ) ;
print_percentile ( 99.9 ) ;
print_percentile ( 99.99 ) ;
2014-05-06 18:02:57 +00:00
2017-04-01 07:20:54 +00:00
info . clear ( ) ;
}
2016-09-28 16:49:59 +00:00
2017-04-01 07:20:54 +00:00
void reportJSON ( Stats & info , const std : : string & filename )
{
WriteBufferFromFile json_out ( filename ) ;
2016-09-28 16:49:59 +00:00
2019-01-04 12:10:00 +00:00
std : : lock_guard lock ( mutex ) ;
2016-09-28 16:49:59 +00:00
2017-04-01 07:20:54 +00:00
auto print_key_value = [ & ] ( auto key , auto value , bool with_comma = true )
{
json_out < < double_quote < < key < < " : " < < value < < ( with_comma ? " , \n " : " \n " ) ;
} ;
2016-09-28 16:49:59 +00:00
2017-04-01 07:20:54 +00:00
auto print_percentile = [ & json_out , & info ] ( auto percent , bool with_comma = true )
{
json_out < < " \" " < < percent < < " \" " < < " : " < < info . sampler . quantileInterpolated ( percent / 100.0 ) < < ( with_comma ? " , \n " : " \n " ) ;
} ;
2016-09-28 16:49:59 +00:00
2017-04-01 07:20:54 +00:00
json_out < < " { \n " ;
2016-09-30 12:39:18 +00:00
2017-04-01 07:20:54 +00:00
json_out < < double_quote < < " statistics " < < " : { \n " ;
2016-09-30 12:39:18 +00:00
2017-04-01 07:20:54 +00:00
double seconds = info . watch . elapsedSeconds ( ) ;
print_key_value ( " QPS " , info . queries / seconds ) ;
print_key_value ( " RPS " , info . read_rows / seconds ) ;
print_key_value ( " MiBPS " , info . read_bytes / seconds ) ;
print_key_value ( " RPS_result " , info . result_rows / seconds ) ;
print_key_value ( " MiBPS_result " , info . result_bytes / seconds ) ;
print_key_value ( " num_queries " , info . queries . load ( ) , false ) ;
2016-09-30 12:39:18 +00:00
2017-04-01 07:20:54 +00:00
json_out < < " }, \n " ;
2016-09-30 12:39:18 +00:00
2017-04-01 07:20:54 +00:00
json_out < < double_quote < < " query_time_percentiles " < < " : { \n " ;
2016-09-30 12:39:18 +00:00
2017-04-01 07:20:54 +00:00
for ( int percent = 0 ; percent < = 90 ; percent + = 10 )
print_percentile ( percent ) ;
2016-09-30 12:39:18 +00:00
2017-04-01 07:20:54 +00:00
print_percentile ( 95 ) ;
print_percentile ( 99 ) ;
print_percentile ( 99.9 ) ;
print_percentile ( 99.99 , false ) ;
2016-09-28 16:49:59 +00:00
2017-04-01 07:20:54 +00:00
json_out < < " } \n " ;
2016-09-28 16:49:59 +00:00
2017-04-01 07:20:54 +00:00
json_out < < " } \n " ;
}
2017-02-01 20:30:46 +00:00
public :
2017-04-01 07:20:54 +00:00
~ Benchmark ( )
{
shutdown = true ;
}
2014-03-10 09:33:18 +00:00
} ;
}
2018-09-01 02:22:30 +00:00
# ifndef __clang__
# pragma GCC optimize("-fno-var-tracking-assignments")
# endif
2017-03-24 15:05:54 +00:00
int mainEntryClickHouseBenchmark ( int argc , char * * argv )
2014-03-10 09:33:18 +00:00
{
2017-04-01 07:20:54 +00:00
using namespace DB ;
bool print_stacktrace = true ;
try
{
using boost : : program_options : : value ;
boost : : program_options : : options_description desc ( " Allowed options " ) ;
desc . add_options ( )
2018-10-11 21:12:14 +00:00
( " help " , " produce help message " )
( " concurrency,c " , value < unsigned > ( ) - > default_value ( 1 ) , " number of parallel queries " )
( " delay,d " , value < double > ( ) - > default_value ( 1 ) , " delay between intermediate reports in seconds (set 0 to disable reports) " )
2019-04-15 10:22:05 +00:00
( " stage " , value < std : : string > ( ) - > default_value ( " complete " ) , " request query processing up to specified stage: complete,fetch_columns,with_mergeable_state " )
2018-10-11 21:12:14 +00:00
( " iterations,i " , value < size_t > ( ) - > default_value ( 0 ) , " amount of queries to be executed " )
( " timelimit,t " , value < double > ( ) - > default_value ( 0. ) , " stop launch of queries after specified time limit " )
( " randomize,r " , value < bool > ( ) - > default_value ( false ) , " randomize order of execution " )
( " json " , value < std : : string > ( ) - > default_value ( " " ) , " write final report to specified file in JSON format " )
( " host,h " , value < std : : string > ( ) - > default_value ( " localhost " ) , " " )
( " port " , value < UInt16 > ( ) - > default_value ( 9000 ) , " " )
2018-11-28 21:21:22 +00:00
( " secure,s " , " Use TLS connection " )
2018-10-11 21:12:14 +00:00
( " user " , value < std : : string > ( ) - > default_value ( " default " ) , " " )
( " password " , value < std : : string > ( ) - > default_value ( " " ) , " " )
( " database " , value < std : : string > ( ) - > default_value ( " default " ) , " " )
( " stacktrace " , " print stack traces of exceptions " )
2017-04-01 07:20:54 +00:00
;
2019-04-25 14:08:20 +00:00
Settings settings ;
settings . addProgramOptions ( desc ) ;
2017-04-01 07:20:54 +00:00
boost : : program_options : : variables_map options ;
boost : : program_options : : store ( boost : : program_options : : parse_command_line ( argc , argv , desc ) , options ) ;
2019-04-25 14:08:20 +00:00
boost : : program_options : : notify ( options ) ;
2017-04-01 07:20:54 +00:00
if ( options . count ( " help " ) )
{
std : : cout < < " Usage: " < < argv [ 0 ] < < " [options] < queries.txt \n " ;
std : : cout < < desc < < " \n " ;
return 1 ;
}
print_stacktrace = options . count ( " stacktrace " ) ;
2018-09-06 18:05:33 +00:00
UseSSL use_ssl ;
2017-04-01 07:20:54 +00:00
Benchmark benchmark (
options [ " concurrency " ] . as < unsigned > ( ) ,
options [ " delay " ] . as < double > ( ) ,
options [ " host " ] . as < std : : string > ( ) ,
options [ " port " ] . as < UInt16 > ( ) ,
2018-11-28 15:31:09 +00:00
options . count ( " secure " ) ,
2017-04-01 07:20:54 +00:00
options [ " database " ] . as < std : : string > ( ) ,
options [ " user " ] . as < std : : string > ( ) ,
options [ " password " ] . as < std : : string > ( ) ,
options [ " stage " ] . as < std : : string > ( ) ,
options [ " randomize " ] . as < bool > ( ) ,
options [ " iterations " ] . as < size_t > ( ) ,
options [ " timelimit " ] . as < double > ( ) ,
options [ " json " ] . as < std : : string > ( ) ,
settings ) ;
2018-11-28 14:41:51 +00:00
return benchmark . run ( ) ;
2017-04-01 07:20:54 +00:00
}
catch ( . . . )
{
std : : cerr < < getCurrentExceptionMessage ( print_stacktrace , true ) < < std : : endl ;
return getCurrentExceptionCode ( ) ;
}
2014-03-10 09:33:18 +00:00
}