Speed up build by removing old example programs

This commit is contained in:
Alexey Milovidov 2020-06-06 19:19:16 +03:00
parent 3e84b17cf8
commit d3d02a67f4
29 changed files with 132 additions and 2812 deletions

View File

@ -1,13 +1,4 @@
set(SRCS) set(SRCS)
add_executable (expression_stream expression_stream.cpp ${SRCS})
target_link_libraries (expression_stream PRIVATE dbms clickhouse_storages_system clickhouse_parsers)
add_executable (filter_stream filter_stream.cpp ${SRCS})
target_link_libraries (filter_stream PRIVATE dbms clickhouse_storages_system clickhouse_parsers clickhouse_common_io)
add_executable (union_stream2 union_stream2.cpp ${SRCS})
target_link_libraries (union_stream2 PRIVATE dbms)
add_executable (finish_sorting_stream finish_sorting_stream.cpp ${SRCS}) add_executable (finish_sorting_stream finish_sorting_stream.cpp ${SRCS})
target_link_libraries (finish_sorting_stream PRIVATE dbms) target_link_libraries (finish_sorting_stream PRIVATE dbms)

View File

@ -1,86 +0,0 @@
#include <iostream>
#include <iomanip>
#include <IO/WriteBufferFromOStream.h>
#include <IO/ReadHelpers.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
int main(int argc, char ** argv)
try
{
using namespace DB;
size_t n = argc == 2 ? parse<UInt64>(argv[1]) : 10ULL;
std::string input = "SELECT number, number / 3, number * number";
ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
NamesAndTypesList source_columns = {{"number", std::make_shared<DataTypeUInt64>()}};
auto syntax_result = SyntaxAnalyzer(context).analyze(ast, source_columns);
SelectQueryExpressionAnalyzer analyzer(ast, syntax_result, context);
ExpressionActionsChain chain(context);
analyzer.appendSelect(chain, false);
analyzer.appendProjectResult(chain);
chain.finalize();
ExpressionActionsPtr expression = chain.getLastActions();
StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false);
Names column_names;
column_names.push_back("number");
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in;
in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, context, stage, 8192, 1)[0]));
in = std::make_shared<ExpressionBlockInputStream>(in, expression);
in = std::make_shared<LimitBlockInputStream>(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));
WriteBufferFromOStream out1(std::cout);
BlockOutputStreamPtr out = FormatFactory::instance().getOutput("TabSeparated", out1, expression->getSampleBlock(), context);
{
Stopwatch stopwatch;
stopwatch.start();
copyData(*in, *out);
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
throw;
}

View File

@ -1,89 +0,0 @@
#include <iostream>
#include <iomanip>
#include <IO/WriteBufferFromOStream.h>
#include <IO/ReadHelpers.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
int main(int argc, char ** argv)
try
{
using namespace DB;
size_t n = argc == 2 ? parse<UInt64>(argv[1]) : 10ULL;
std::string input = "SELECT number, number % 3 == 1";
ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
formatAST(*ast, std::cerr);
std::cerr << std::endl;
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
NamesAndTypesList source_columns = {{"number", std::make_shared<DataTypeUInt64>()}};
auto syntax_result = SyntaxAnalyzer(context).analyze(ast, source_columns);
SelectQueryExpressionAnalyzer analyzer(ast, syntax_result, context);
ExpressionActionsChain chain(context);
analyzer.appendSelect(chain, false);
analyzer.appendProjectResult(chain);
chain.finalize();
ExpressionActionsPtr expression = chain.getLastActions();
StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false);
Names column_names;
column_names.push_back("number");
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, context, stage, 8192, 1)[0]));
in = std::make_shared<FilterBlockInputStream>(in, expression, "equals(modulo(number, 3), 1)");
in = std::make_shared<LimitBlockInputStream>(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));
WriteBufferFromOStream ob(std::cout);
BlockOutputStreamPtr out = FormatFactory::instance().getOutput("TabSeparated", ob, expression->getSampleBlock(), context);
{
Stopwatch stopwatch;
stopwatch.start();
copyData(*in, *out);
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
throw;
}

View File

@ -1,66 +0,0 @@
#include <iostream>
#include <iomanip>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/loadMetadata.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
using namespace DB;
int main(int, char **)
try
{
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
Settings settings = context.getSettings();
context.setPath("./");
loadMetadata(context);
Names column_names;
column_names.push_back("WatchID");
StoragePtr table = DatabaseCatalog::instance().getTable({"default", "hits6"}, context);
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
auto pipes = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads);
BlockInputStreams streams(pipes.size());
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = std::make_shared<AsynchronousBlockInputStream>(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipes[i])));
BlockInputStreamPtr stream = std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
stream = std::make_shared<LimitBlockInputStream>(stream, 10, 0);
WriteBufferFromFileDescriptor wb(STDERR_FILENO);
Block sample = table->getSampleBlock();
BlockOutputStreamPtr out = context.getOutputFormat("TabSeparated", wb, sample);
copyData(*stream, *out);
return 0;
}
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTraceString();
return 1;
}

View File

@ -1,15 +1,3 @@
add_executable (expression expression.cpp)
target_link_libraries (expression PRIVATE dbms clickhouse_parsers)
add_executable (create_query create_query.cpp)
target_link_libraries (create_query PRIVATE dbms clickhouse_parsers)
add_executable (select_query select_query.cpp)
target_link_libraries (select_query PRIVATE clickhouse_storages_system dbms clickhouse_common_io)
add_executable (aggregate aggregate.cpp)
target_link_libraries (aggregate PRIVATE dbms)
add_executable (hash_map hash_map.cpp) add_executable (hash_map hash_map.cpp)
target_include_directories (hash_map SYSTEM BEFORE PRIVATE ${SPARSEHASH_INCLUDE_DIR}) target_include_directories (hash_map SYSTEM BEFORE PRIVATE ${SPARSEHASH_INCLUDE_DIR})
target_link_libraries (hash_map PRIVATE dbms) target_link_libraries (hash_map PRIVATE dbms)

View File

@ -1,105 +0,0 @@
#include <iostream>
#include <iomanip>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Aggregator.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
int main(int argc, char ** argv)
{
using namespace DB;
try
{
size_t n = argc == 2 ? std::stol(argv[1]) : 10;
Block block;
{
ColumnWithTypeAndName column;
column.name = "x";
column.type = std::make_shared<DataTypeInt16>();
auto col = ColumnInt16::create();
auto & vec_x = col->getData();
vec_x.resize(n);
for (size_t i = 0; i < n; ++i)
vec_x[i] = i % 9;
column.column = std::move(col);
block.insert(column);
}
const char * strings[] = {"abc", "def", "abcd", "defg", "ac"};
{
ColumnWithTypeAndName column;
column.name = "s1";
column.type = std::make_shared<DataTypeString>();
auto col = ColumnString::create();
for (size_t i = 0; i < n; ++i)
col->insert(std::string(strings[i % 5]));
column.column = std::move(col);
block.insert(column);
}
{
ColumnWithTypeAndName column;
column.name = "s2";
column.type = std::make_shared<DataTypeString>();
auto col = ColumnString::create();
for (size_t i = 0; i < n; ++i)
col->insert(std::string(strings[i % 3]));
column.column = std::move(col);
block.insert(column);
}
BlockInputStreamPtr stream = std::make_shared<OneBlockInputStream>(block);
AggregatedDataVariants aggregated_data_variants;
AggregateFunctionFactory factory;
AggregateDescriptions aggregate_descriptions(1);
DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
Aggregator::Params params(
stream->getHeader(), {0, 1}, aggregate_descriptions,
false, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, 1, 0);
Aggregator aggregator(params);
{
Stopwatch stopwatch;
stopwatch.start();
aggregator.execute(stream, aggregated_data_variants);
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
}
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
}
return 0;
}

View File

@ -1,103 +0,0 @@
#include <iostream>
#include <iomanip>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Databases/DatabaseOrdinary.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
using namespace DB;
int main(int, char **)
try
{
std::string input = "CREATE TABLE IF NOT EXISTS hits (\n"
"WatchID UInt64,\n"
"JavaEnable UInt8,\n"
"Title String,\n"
"EventTime DateTime,\n"
"CounterID UInt32,\n"
"ClientIP UInt32,\n"
"RegionID UInt32,\n"
"UniqID UInt64,\n"
"CounterClass UInt8,\n"
"OS UInt8,\n"
"UserAgent UInt8,\n"
"URL String,\n"
"Referer String,\n"
"ResolutionWidth UInt16,\n"
"ResolutionHeight UInt16,\n"
"ResolutionDepth UInt8,\n"
"FlashMajor UInt8,\n"
"FlashMinor UInt8,\n"
"FlashMinor2 String,\n"
"NetMajor UInt8,\n"
"NetMinor UInt8,\n"
"UserAgentMajor UInt16,\n"
"UserAgentMinor FixedString(2),\n"
"CookieEnable UInt8,\n"
"JavascriptEnable UInt8,\n"
"IsMobile UInt8,\n"
"MobilePhone UInt8,\n"
"MobilePhoneModel String,\n"
"Params String,\n"
"IPNetworkID UInt32,\n"
"TraficSourceID Int8,\n"
"SearchEngineID UInt16,\n"
"SearchPhrase String,\n"
"AdvEngineID UInt8,\n"
"IsArtifical UInt8,\n"
"WindowClientWidth UInt16,\n"
"WindowClientHeight UInt16,\n"
"ClientTimeZone Int16,\n"
"ClientEventTime DateTime,\n"
"SilverlightVersion1 UInt8,\n"
"SilverlightVersion2 UInt8,\n"
"SilverlightVersion3 UInt32,\n"
"SilverlightVersion4 UInt16,\n"
"PageCharset String,\n"
"CodeVersion UInt32,\n"
"IsLink UInt8,\n"
"IsDownload UInt8,\n"
"IsNotBounce UInt8,\n"
"FUniqID UInt64,\n"
"OriginalURL String,\n"
"HID UInt32,\n"
"IsOldCounter UInt8,\n"
"IsEvent UInt8,\n"
"IsParameter UInt8,\n"
"DontCountHits UInt8,\n"
"WithHash UInt8\n"
") ENGINE = Log";
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
context.setPath("./");
auto database = std::make_shared<DatabaseOrdinary>("test", "./metadata/test/", context);
DatabaseCatalog::instance().attachDatabase("test", database);
database->loadStoredObjects(context, false);
context.setCurrentDatabase("test");
InterpreterCreateQuery interpreter(ast, context);
interpreter.execute();
return 0;
}
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTraceString();
return 1;
}

View File

@ -1,140 +0,0 @@
#include <iostream>
#include <iomanip>
#include <IO/WriteBufferFromOStream.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
int main(int argc, char ** argv)
{
using namespace DB;
try
{
std::string input = "SELECT x, s1, s2, "
"/*"
"2 + x * 2, x * 2, x % 3 == 1, "
"s1 == 'abc', s1 == s2, s1 != 'abc', s1 != s2, "
"s1 < 'abc', s1 < s2, s1 > 'abc', s1 > s2, "
"s1 <= 'abc', s1 <= s2, s1 >= 'abc', s1 >= s2, "
"*/"
"s1 < s2 AND x % 3 < x % 5";
ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
formatAST(*ast, std::cerr);
std::cerr << std::endl;
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
NamesAndTypesList columns
{
{"x", std::make_shared<DataTypeInt16>()},
{"s1", std::make_shared<DataTypeString>()},
{"s2", std::make_shared<DataTypeString>()}
};
auto syntax_result = SyntaxAnalyzer(context).analyze(ast, columns);
SelectQueryExpressionAnalyzer analyzer(ast, syntax_result, context);
ExpressionActionsChain chain(context);
analyzer.appendSelect(chain, false);
analyzer.appendProjectResult(chain);
chain.finalize();
ExpressionActionsPtr expression = chain.getLastActions();
size_t n = argc == 2 ? std::stol(argv[1]) : 10;
Block block;
{
ColumnWithTypeAndName column;
column.name = "x";
column.type = std::make_shared<DataTypeInt16>();
auto col = ColumnInt16::create();
auto & vec_x = col->getData();
vec_x.resize(n);
for (size_t i = 0; i < n; ++i)
vec_x[i] = i % 9;
column.column = std::move(col);
block.insert(column);
}
const char * strings[] = {"abc", "def", "abcd", "defg", "ac"};
{
ColumnWithTypeAndName column;
column.name = "s1";
column.type = std::make_shared<DataTypeString>();
auto col = ColumnString::create();
for (size_t i = 0; i < n; ++i)
col->insert(std::string(strings[i % 5]));
column.column = std::move(col);
block.insert(column);
}
{
ColumnWithTypeAndName column;
column.name = "s2";
column.type = std::make_shared<DataTypeString>();
auto col = ColumnString::create();
for (size_t i = 0; i < n; ++i)
col->insert(std::string(strings[i % 3]));
column.column = std::move(col);
block.insert(column);
}
{
Stopwatch stopwatch;
stopwatch.start();
expression->execute(block);
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
auto is = std::make_shared<OneBlockInputStream>(block);
LimitBlockInputStream lis(is, 20, std::max(0, static_cast<int>(n) - 20));
WriteBufferFromOStream out_buf(std::cout);
BlockOutputStreamPtr out = FormatFactory::instance().getOutput("TabSeparated", out_buf, block, context);
copyData(lis, *out);
}
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
}
return 0;
}

View File

@ -1,61 +0,0 @@
#include <iostream>
#include <iomanip>
#include <common/DateLUT.h>
#include <Poco/ConsoleChannel.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Storages/StorageLog.h>
#include <Storages/System/attachSystemTables.h>
#include <Interpreters/Context.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/executeQuery.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseOrdinary.h>
using namespace DB;
int main(int, char **)
try
{
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
/// Pre-initialize the `DateLUT` so that the first initialization does not affect the measured execution speed.
DateLUT::instance();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
context.setPath("./");
loadMetadata(context);
DatabasePtr system = std::make_shared<DatabaseOrdinary>("system", "./metadata/system/", context);
DatabaseCatalog::instance().attachDatabase("system", system);
system->loadStoredObjects(context, false);
attachSystemTablesLocal(*DatabaseCatalog::instance().getSystemDatabase());
context.setCurrentDatabase("default");
ReadBufferFromFileDescriptor in(STDIN_FILENO);
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
executeQuery(in, out, /* allow_into_outfile = */ false, context, {});
return 0;
}
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTraceString();
return 1;
}

View File

@ -1,15 +0,0 @@
add_executable (processors_test processors_test.cpp)
add_executable (processors_test_chain processors_test_chain.cpp)
add_executable (processors_test_merge processors_test_merge.cpp)
add_executable (processors_test_merging_sorted_transform processors_test_merging_sorted_transform.cpp)
add_executable (processors_test_merge_sorting_transform processors_test_merge_sorting_transform.cpp)
add_executable (processors_test_expand_pipeline processors_test_expand_pipeline.cpp)
add_executable (processors_test_aggregation processors_test_aggregation.cpp)
target_link_libraries (processors_test PRIVATE dbms)
target_link_libraries (processors_test_chain PRIVATE dbms)
target_link_libraries (processors_test_merge PRIVATE dbms)
target_link_libraries (processors_test_expand_pipeline PRIVATE dbms)
target_link_libraries (processors_test_merging_sorted_transform PRIVATE dbms)
target_link_libraries (processors_test_merge_sorting_transform PRIVATE dbms)
target_link_libraries (processors_test_aggregation PRIVATE dbms clickhouse_aggregate_functions)

View File

@ -1,228 +0,0 @@
#include <iostream>
#include <thread>
#include <atomic>
#include <Processors/IProcessor.h>
#include <Processors/ISource.h>
#include <Processors/ISink.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/ForkProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/QueueBuffer.h>
#include <Processors/printPipeline.h>
#include <Columns/ColumnsNumber.h>
#include <Common/ThreadPool.h>
#include <Common/EventCounter.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromOStream.h>
#include <Processors/Executors/PipelineExecutor.h>
using namespace DB;
class NumbersSource : public ISource
{
public:
String getName() const override { return "Numbers"; }
NumbersSource(UInt64 start_number, unsigned sleep_useconds_)
: ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
current_number(start_number), sleep_useconds(sleep_useconds_)
{
}
private:
UInt64 current_number = 0;
unsigned sleep_useconds;
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
++current_number;
return Chunk(std::move(columns), 1);
}
};
class SleepyNumbersSource : public IProcessor
{
protected:
OutputPort & output;
public:
String getName() const override { return "SleepyNumbers"; }
SleepyNumbersSource(UInt64 start_number, unsigned sleep_useconds_)
: IProcessor({}, {Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})})
, output(outputs.front()), current_number(start_number), sleep_useconds(sleep_useconds_)
{
}
Status prepare() override
{
if (active)
return Status::Wait;
if (output.isFinished())
return Status::Finished;
if (!output.canPush())
return Status::PortFull;
if (!current_chunk)
return Status::Async;
output.push(std::move(current_chunk));
return Status::Async;
}
void schedule(EventCounter & watch) override
{
active = true;
pool.scheduleOrThrowOnError([&watch, this]
{
usleep(sleep_useconds);
current_chunk = generate();
active = false;
watch.notify();
});
}
OutputPort & getPort() { return output; }
private:
ThreadPool pool{1, 1, 0};
Chunk current_chunk;
std::atomic_bool active {false};
UInt64 current_number = 0;
unsigned sleep_useconds;
Chunk generate()
{
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
++current_number;
return Chunk(std::move(columns), 1);
}
};
class PrintSink : public ISink
{
public:
String getName() const override { return "Print"; }
explicit PrintSink(String prefix_)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix_))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString(prefix, out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}
out.next();
}
};
int main(int, char **)
try
{
auto source0 = std::make_shared<NumbersSource>(0, 300000);
auto header = source0->getPort().getHeader();
auto limit0 = std::make_shared<LimitTransform>(header, 10, 0);
connect(source0->getPort(), limit0->getInputPort());
auto queue = std::make_shared<QueueBuffer>(header);
connect(limit0->getOutputPort(), queue->getInputPort());
auto source1 = std::make_shared<SleepyNumbersSource>(100, 100000);
auto source2 = std::make_shared<SleepyNumbersSource>(1000, 200000);
auto source3 = std::make_shared<NumbersSource>(10, 100000);
auto limit3 = std::make_shared<LimitTransform>(header, 5, 0);
connect(source3->getPort(), limit3->getInputPort());
auto source4 = std::make_shared<NumbersSource>(10, 100000);
auto limit4 = std::make_shared<LimitTransform>(header, 5, 0);
connect(source4->getPort(), limit4->getInputPort());
auto concat = std::make_shared<ConcatProcessor>(header, 2);
connect(limit3->getOutputPort(), concat->getInputs().front());
connect(limit4->getOutputPort(), concat->getInputs().back());
auto fork = std::make_shared<ForkProcessor>(header, 2);
connect(concat->getOutputPort(), fork->getInputPort());
auto print_after_concat = std::make_shared<PrintSink>("---------- ");
connect(fork->getOutputs().back(), print_after_concat->getPort());
auto resize = std::make_shared<ResizeProcessor>(header, 4, 1);
auto input_it = resize->getInputs().begin();
connect(queue->getOutputPort(), *(input_it++));
connect(source1->getPort(), *(input_it++));
connect(source2->getPort(), *(input_it++));
connect(fork->getOutputs().front(), *(input_it++));
auto limit = std::make_shared<LimitTransform>(header, 100, 0);
connect(resize->getOutputs().front(), limit->getInputPort());
auto sink = std::make_shared<PrintSink>("");
connect(limit->getOutputPort(), sink->getPort());
WriteBufferFromOStream out(std::cout);
std::vector<ProcessorPtr> processors = {source0, source1, source2, source3, source4, limit0, limit3, limit4, limit,
queue, concat, fork, print_after_concat, resize, sink};
printPipeline(processors, out);
// ThreadPool pool(4, 4, 10);
PipelineExecutor executor(processors);
/// SequentialPipelineExecutor executor({sink});
executor.execute(1);
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,411 +0,0 @@
#include <iostream>
#include <thread>
#include <atomic>
#include <Processors/IProcessor.h>
#include <Processors/ISource.h>
#include <Processors/ISink.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/ForkProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/QueueBuffer.h>
#include <Processors/printPipeline.h>
#include <Columns/ColumnsNumber.h>
#include <Common/ThreadPool.h>
#include <Common/EventCounter.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromOStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <DataTypes/DataTypeFactory.h>
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Disks/StoragePolicy.h>
#include <Disks/DiskLocal.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/AutoPtr.h>
#include <Common/CurrentThread.h>
#include <Poco/Path.h>
using namespace DB;
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class NumbersSource : public ISource
{
public:
String getName() const override { return "Numbers"; }
NumbersSource(UInt64 start_number, UInt64 step_, UInt64 block_size_, unsigned sleep_useconds_)
: ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
current_number(start_number), step(step_), block_size(block_size_), sleep_useconds(sleep_useconds_)
{
}
private:
UInt64 current_number = 0;
UInt64 step;
UInt64 block_size;
unsigned sleep_useconds;
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create());
for (UInt64 i = 0; i < block_size; ++i, current_number += step)
columns.back()->insert(Field(current_number));
return Chunk(std::move(columns), block_size);
}
};
class PrintSink : public ISink
{
public:
String getName() const override { return "Print"; }
PrintSink(String prefix_, Block header)
: ISink(std::move(header)),
prefix(std::move(prefix_))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString(prefix, out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}
out.next();
}
};
class CheckSink : public ISink
{
public:
String getName() const override { return "Check"; }
CheckSink(Block header, size_t num_rows)
: ISink(std::move(header)), read_rows(num_rows, false)
{
}
void checkAllRead()
{
for (size_t i = 0; i < read_rows.size(); ++i)
{
if (!read_rows[i])
{
throw Exception("Check Failed. Row " + toString(i) + " was not read.", ErrorCodes::LOGICAL_ERROR);
}
}
}
private:
std::vector<bool> read_rows;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
std::vector<UInt64> values(columns);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
values[column_num] = chunk.getColumns()[column_num]->getUInt(row_num);
}
if (values.size() >= 2 && 3 * values[0] != values[1])
throw Exception("Check Failed. Got (" + toString(values[0]) + ", " + toString(values[1]) + ") in result,"
+ "but " + toString(values[0]) + " * 3 != " + toString(values[1]),
ErrorCodes::LOGICAL_ERROR);
if (values[0] >= read_rows.size())
throw Exception("Check Failed. Got string with number " + toString(values[0]) +
" (max " + toString(read_rows.size()), ErrorCodes::LOGICAL_ERROR);
if (read_rows[values[0]])
throw Exception("Row " + toString(values[0]) + " was already read.", ErrorCodes::LOGICAL_ERROR);
read_rows[values[0]] = true;
}
}
};
template<typename TimeT = std::chrono::milliseconds>
struct Measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
int main(int, char **)
try
{
ThreadStatus thread_status;
CurrentThread::initializeQuery();
auto thread_group = CurrentThread::getGroup();
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
registerAggregateFunctions();
auto & factory = AggregateFunctionFactory::instance();
auto cur_path = Poco::Path().absolute().toString();
auto disk = std::make_shared<DiskLocal>("tmp", cur_path, 0);
auto tmp_volume = std::make_shared<VolumeJBOD>("tmp", std::vector<DiskPtr>{disk}, 0);
auto execute_one_stream = [&](String msg, size_t num_threads, bool two_level, bool external)
{
std::cerr << '\n' << msg << "\n";
size_t num_rows = 1000000;
size_t block_size = 1000;
auto source1 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
auto source2 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
auto source3 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), num_rows, 0);
auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), num_rows, 0);
auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), num_rows, 0);
auto resize = std::make_shared<ResizeProcessor>(source1->getPort().getHeader(), 3, 1);
AggregateDescriptions aggregate_descriptions(1);
DataTypes sum_types = { std::make_shared<DataTypeUInt64>() };
aggregate_descriptions[0].function = factory.get("sum", sum_types);
aggregate_descriptions[0].arguments = {0};
bool overflow_row = false; /// Without overflow row.
size_t max_rows_to_group_by = 0; /// All.
size_t group_by_two_level_threshold = two_level ? 10 : 0;
size_t group_by_two_level_threshold_bytes = two_level ? 128 : 0;
size_t max_bytes_before_external_group_by = external ? 10000000 : 0;
Aggregator::Params params(
source1->getPort().getHeader(),
{0},
aggregate_descriptions,
overflow_row,
max_rows_to_group_by,
OverflowMode::THROW,
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
tmp_volume,
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
auto merge_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ true);
auto aggregating = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params);
auto merging = std::make_shared<MergingAggregatedTransform>(aggregating->getOutputs().front().getHeader(), merge_params, 4);
auto sink = std::make_shared<CheckSink>(merging->getOutputPort().getHeader(), num_rows);
connect(source1->getPort(), limit1->getInputPort());
connect(source2->getPort(), limit2->getInputPort());
connect(source3->getPort(), limit3->getInputPort());
auto it = resize->getInputs().begin();
connect(limit1->getOutputPort(), *(it++));
connect(limit2->getOutputPort(), *(it++));
connect(limit3->getOutputPort(), *(it++));
connect(resize->getOutputs().front(), aggregating->getInputs().front());
connect(aggregating->getOutputs().front(), merging->getInputPort());
connect(merging->getOutputPort(), sink->getPort());
std::vector<ProcessorPtr> processors = {source1, source2, source3,
limit1, limit2, limit3,
resize, aggregating, merging, sink};
// WriteBufferFromOStream out(std::cout);
// printPipeline(processors, out);
PipelineExecutor executor(processors);
executor.execute(num_threads);
sink->checkAllRead();
};
auto execute_mult_streams = [&](String msg, size_t num_threads, bool two_level, bool external)
{
std::cerr << '\n' << msg << "\n";
size_t num_rows = 1000000;
size_t block_size = 1000;
auto source1 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
auto source2 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
auto source3 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), num_rows, 0);
auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), num_rows, 0);
auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), num_rows, 0);
AggregateDescriptions aggregate_descriptions(1);
DataTypes sum_types = { std::make_shared<DataTypeUInt64>() };
aggregate_descriptions[0].function = factory.get("sum", sum_types);
aggregate_descriptions[0].arguments = {0};
bool overflow_row = false; /// Without overflow row.
size_t max_rows_to_group_by = 0; /// All.
size_t group_by_two_level_threshold = two_level ? 10 : 0;
size_t group_by_two_level_threshold_bytes = two_level ? 128 : 0;
size_t max_bytes_before_external_group_by = external ? 10000000 : 0;
Aggregator::Params params(
source1->getPort().getHeader(),
{0},
aggregate_descriptions,
overflow_row,
max_rows_to_group_by,
OverflowMode::THROW,
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
tmp_volume,
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
auto merge_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ true);
ManyAggregatedDataPtr data = std::make_unique<ManyAggregatedData>(3);
auto aggregating1 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 0, 4, 4);
auto aggregating2 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 1, 4, 4);
auto aggregating3 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 2, 4, 4);
Processors merging_pipe = createMergingAggregatedMemoryEfficientPipe(
aggregating1->getOutputs().front().getHeader(),
merge_params,
3, 2);
auto sink = std::make_shared<CheckSink>(merging_pipe.back()->getOutputs().back().getHeader(), num_rows);
connect(source1->getPort(), limit1->getInputPort());
connect(source2->getPort(), limit2->getInputPort());
connect(source3->getPort(), limit3->getInputPort());
connect(limit1->getOutputPort(), aggregating1->getInputs().front());
connect(limit2->getOutputPort(), aggregating2->getInputs().front());
connect(limit3->getOutputPort(), aggregating3->getInputs().front());
auto it = merging_pipe.front()->getInputs().begin();
connect(aggregating1->getOutputs().front(), *(it++));
connect(aggregating2->getOutputs().front(), *(it++));
connect(aggregating3->getOutputs().front(), *(it++));
connect(merging_pipe.back()->getOutputs().back(), sink->getPort());
std::vector<ProcessorPtr> processors = {source1, source2, source3,
limit1, limit2, limit3,
aggregating1, aggregating2, aggregating3, sink};
processors.insert(processors.end(), merging_pipe.begin(), merging_pipe.end());
// WriteBufferFromOStream out(std::cout);
// printPipeline(processors, out);
PipelineExecutor executor(processors);
executor.execute(num_threads);
sink->checkAllRead();
};
std::vector<String> messages;
std::vector<Int64> times;
auto exec = [&](auto func, String msg, size_t num_threads, bool two_level, bool external)
{
msg += ", two_level = " + toString(two_level) + ", external = " + toString(external);
Int64 time = 0;
auto wrapper = [&]()
{
ThreadStatus cur_status;
CurrentThread::attachToIfDetached(thread_group);
time = Measure<>::execution(func, msg, num_threads, two_level, external);
};
std::thread thread(wrapper);
thread.join();
messages.emplace_back(msg);
times.emplace_back(time);
};
size_t num_threads = 4;
exec(execute_one_stream, "One stream, single thread", 1, false, false);
exec(execute_one_stream, "One stream, multiple threads", num_threads, false, false);
exec(execute_mult_streams, "Multiple streams, single thread", 1, false, false);
exec(execute_mult_streams, "Multiple streams, multiple threads", num_threads, false, false);
exec(execute_one_stream, "One stream, single thread", 1, true, false);
exec(execute_one_stream, "One stream, multiple threads", num_threads, true, false);
exec(execute_mult_streams, "Multiple streams, single thread", 1, true, false);
exec(execute_mult_streams, "Multiple streams, multiple threads", num_threads, true, false);
exec(execute_one_stream, "One stream, single thread", 1, true, true);
exec(execute_one_stream, "One stream, multiple threads", num_threads, true, true);
exec(execute_mult_streams, "Multiple streams, single thread", 1, true, true);
exec(execute_mult_streams, "Multiple streams, multiple threads", num_threads, true, true);
for (size_t i = 0; i < messages.size(); ++i)
std::cout << messages[i] << " time: " << times[i] << " ms.\n";
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,165 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/IProcessor.h>
#include <Processors/ISource.h>
#include <Processors/ISink.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/LimitTransform.h>
#include <Processors/printPipeline.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatSettings.h>
#include <iostream>
#include <chrono>
using namespace DB;
class NumbersSource : public ISource
{
public:
String getName() const override { return "Numbers"; }
NumbersSource(UInt64 start_number, unsigned sleep_useconds_)
: ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
current_number(start_number), sleep_useconds(sleep_useconds_)
{
}
private:
UInt64 current_number = 0;
unsigned sleep_useconds;
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
++current_number;
return Chunk(std::move(columns), 1);
}
};
class SleepyTransform : public ISimpleTransform
{
public:
explicit SleepyTransform(unsigned sleep_useconds_)
: ISimpleTransform(
Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
/*skip_empty_chunks =*/ false)
, sleep_useconds(sleep_useconds_) {}
String getName() const override { return "SleepyTransform"; }
protected:
void transform(Chunk &) override
{
usleep(sleep_useconds);
}
private:
unsigned sleep_useconds;
};
class PrintSink : public ISink
{
public:
String getName() const override { return "Print"; }
explicit PrintSink(String prefix_)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix_))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString(prefix, out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}
out.next();
}
};
template<typename TimeT = std::chrono::milliseconds>
struct Measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
int main(int, char **)
try
{
auto execute_chain = [](size_t num_threads)
{
std::cerr << "---------------------\n";
auto source = std::make_shared<NumbersSource>(0, 100000);
auto transform1 = std::make_shared<SleepyTransform>(100000);
auto transform2 = std::make_shared<SleepyTransform>(100000);
auto transform3 = std::make_shared<SleepyTransform>(100000);
auto limit = std::make_shared<LimitTransform>(source->getPort().getHeader(), 20, 0);
auto sink = std::make_shared<PrintSink>("");
connect(source->getPort(), transform1->getInputPort());
connect(transform1->getOutputPort(), transform2->getInputPort());
connect(transform2->getOutputPort(), transform3->getInputPort());
connect(transform3->getOutputPort(), limit->getInputPort());
connect(limit->getOutputPort(), sink->getPort());
std::vector<ProcessorPtr> processors = {source, transform1, transform2, transform3, limit, sink};
// WriteBufferFromOStream out(std::cout);
// printPipeline(processors, out);
PipelineExecutor executor(processors);
executor.execute(num_threads);
};
auto time_single = Measure<>::execution(execute_chain, 1);
auto time_mt = Measure<>::execution(execute_chain, 4);
std::cout << "Single Thread time: " << time_single << " ms.\n";
std::cout << "Multiple Threads time: " << time_mt << " ms.\n";
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,285 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/ISink.h>
#include <Processors/ISource.h>
#include <Processors/LimitTransform.h>
#include <Processors/printPipeline.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatSettings.h>
#include <iostream>
#include <chrono>
#include <Processors/ISimpleTransform.h>
using namespace DB;
class PrintSink : public ISink
{
public:
String getName() const override { return "Print"; }
explicit PrintSink(String prefix_)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix_))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString(prefix, out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}
out.next();
}
};
class OneNumberSource : public ISource
{
public:
String getName() const override { return "OneNumber"; }
explicit OneNumberSource(UInt64 number_)
: ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
number(number_)
{
}
private:
UInt64 number;
bool done = false;
Chunk generate() override
{
if (done)
return Chunk();
done = true;
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, number));
return Chunk(std::move(columns), 1);
}
};
class ExpandingProcessor : public IProcessor
{
public:
String getName() const override { return "Expanding"; }
ExpandingProcessor()
: IProcessor({Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})},
{Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})})
{}
Status prepare() override
{
auto & main_input = inputs.front();
auto & main_output = outputs.front();
auto & additional_input = inputs.back();
auto & additional_output = outputs.back();
/// Check can output.
if (main_output.isFinished())
{
main_input.close();
additional_input.close();
additional_output.finish();
return Status::Finished;
}
if (!main_output.canPush())
{
main_input.setNotNeeded();
additional_input.setNotNeeded();
return Status::PortFull;
}
if (chunk_from_add_inp && is_processed)
{
if (is_processed)
main_output.push(std::move(chunk_from_add_inp));
else
return Status::Ready;
}
if (expanded)
{
if (chunk_from_main_inp)
{
if (additional_output.isFinished())
{
main_input.close();
return Status::Finished;
}
if (!additional_output.canPush())
{
main_input.setNotNeeded();
return Status::PortFull;
}
additional_output.push(std::move(chunk_from_main_inp));
main_input.close();
}
if (additional_input.isFinished())
{
main_output.finish();
return Status::Finished;
}
additional_input.setNeeded();
if (!additional_input.hasData())
return Status::NeedData;
chunk_from_add_inp = additional_input.pull();
is_processed = false;
return Status::Ready;
}
else
{
if (!chunk_from_main_inp)
{
if (main_input.isFinished())
{
main_output.finish();
return Status::Finished;
}
main_input.setNeeded();
if (!main_input.hasData())
return Status::NeedData;
chunk_from_main_inp = main_input.pull();
main_input.close();
}
UInt64 val = chunk_from_main_inp.getColumns()[0]->getUInt(0);
if (val)
{
--val;
chunk_from_main_inp.setColumns(Columns{ColumnUInt64::create(1, val)}, 1);
return Status::ExpandPipeline;
}
main_output.push(std::move(chunk_from_main_inp));
main_output.finish();
return Status::Finished;
}
}
Processors expandPipeline() override
{
auto & main_input = inputs.front();
auto & main_output = outputs.front();
Processors processors = {std::make_shared<ExpandingProcessor>()};
inputs.push_back({main_input.getHeader(), this});
outputs.push_back({main_output.getHeader(), this});
connect(outputs.back(), processors.back()->getInputs().front());
connect(processors.back()->getOutputs().front(), inputs.back());
inputs.back().setNeeded();
expanded = true;
return processors;
}
void work() override
{
auto num_rows = chunk_from_add_inp.getNumRows();
auto columns = chunk_from_add_inp.mutateColumns();
columns.front()->insert(Field(num_rows));
chunk_from_add_inp.setColumns(std::move(columns), num_rows + 1);
is_processed = true;
}
private:
bool expanded = false;
Chunk chunk_from_main_inp;
Chunk chunk_from_add_inp;
bool is_processed = false;
};
template<typename TimeT = std::chrono::milliseconds>
struct Measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
int main(int, char **)
try
{
auto execute = [](String msg, size_t num, size_t num_threads)
{
std::cerr << msg << "\n";
auto source = std::make_shared<OneNumberSource>(num);
auto expanding = std::make_shared<ExpandingProcessor>();
auto sink = std::make_shared<PrintSink>("");
connect(source->getPort(), expanding->getInputs().front());
connect(expanding->getOutputs().front(), sink->getPort());
std::vector<ProcessorPtr> processors = {source, expanding, sink};
PipelineExecutor executor(processors);
executor.execute(num_threads);
WriteBufferFromOStream out(std::cout);
printPipeline(executor.getProcessors(), out);
};
ThreadPool pool(4, 4, 10);
auto time_single = Measure<>::execution(execute, "Single thread", 10, 1);
auto time_mt = Measure<>::execution(execute, "Multiple threads", 10, 4);
std::cout << "Single Thread time: " << time_single << " ms.\n";
std::cout << "Multiple Threads time:" << time_mt << " ms.\n";
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,334 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/IProcessor.h>
#include <Processors/ISource.h>
#include <Processors/ISink.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/LimitTransform.h>
#include <Processors/printPipeline.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatSettings.h>
#include <iostream>
#include <chrono>
using namespace DB;
class MergingSortedProcessor : public IProcessor
{
public:
MergingSortedProcessor(const Block & header, size_t num_inputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts{header})
, chunks(num_inputs), positions(num_inputs, 0), finished(num_inputs, false)
{
}
String getName() const override { return "MergingSortedProcessor"; }
Status prepare() override
{
auto & output = outputs.front();
/// Check can output.
if (output.isFinished())
{
for (auto & in : inputs)
in.close();
return Status::Finished;
}
if (!output.isNeeded())
{
for (auto & in : inputs)
in.setNotNeeded();
return Status::PortFull;
}
if (output.hasData())
return Status::PortFull;
/// Push if has data.
if (res)
{
output.push(std::move(res));
return Status::PortFull;
}
/// Check for inputs we need.
bool all_inputs_finished = true;
bool all_inputs_has_data = true;
auto it = inputs.begin();
for (size_t i = 0; it != inputs.end(); ++it, ++i)
{
auto & input = *it;
if (!finished[i])
{
if (!input.isFinished())
{
all_inputs_finished = false;
bool needed = positions[i] >= chunks[i].getNumRows();
if (needed)
{
input.setNeeded();
if (input.hasData())
{
chunks[i] = input.pull();
positions[i] = 0;
}
else
all_inputs_has_data = false;
}
else
input.setNotNeeded();
}
else
finished[i] = true;
}
}
if (all_inputs_finished)
{
output.finish();
return Status::Finished;
}
if (!all_inputs_has_data)
return Status::NeedData;
return Status::Ready;
}
void work() override
{
using Key = std::pair<UInt64, size_t>;
std::priority_queue<Key, std::vector<Key>, std::greater<>> queue;
for (size_t i = 0; i < chunks.size(); ++i)
{
if (finished[i])
continue;
if (positions[i] >= chunks[i].getNumRows())
return;
queue.push({chunks[i].getColumns()[0]->getUInt(positions[i]), i});
}
auto col = ColumnUInt64::create();
while (!queue.empty())
{
size_t ps = queue.top().second;
queue.pop();
const auto & cur_col = chunks[ps].getColumns()[0];
col->insertFrom(*cur_col, positions[ps]);
++positions[ps];
if (positions[ps] == cur_col->size())
break;
queue.push({cur_col->getUInt(positions[ps]), ps});
}
UInt64 num_rows = col->size();
res.setColumns(Columns({std::move(col)}), num_rows);
}
OutputPort & getOutputPort() { return outputs.front(); }
private:
Chunks chunks;
Chunk res;
std::vector<size_t> positions;
std::vector<bool> finished;
};
class NumbersSource : public ISource
{
public:
String getName() const override { return "Numbers"; }
NumbersSource(UInt64 start_number, UInt64 step_, unsigned sleep_useconds_)
: ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
current_number(start_number), step(step_), sleep_useconds(sleep_useconds_)
{
}
private:
UInt64 current_number = 0;
UInt64 step;
unsigned sleep_useconds;
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
current_number += step;
return Chunk(std::move(columns), 1);
}
};
class SleepyTransform : public ISimpleTransform
{
public:
explicit SleepyTransform(unsigned sleep_useconds_)
: ISimpleTransform(
Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
false)
, sleep_useconds(sleep_useconds_) {}
String getName() const override { return "SleepyTransform"; }
protected:
void transform(Chunk &) override
{
usleep(sleep_useconds);
}
private:
unsigned sleep_useconds;
};
class PrintSink : public ISink
{
public:
String getName() const override { return "Print"; }
explicit PrintSink(String prefix_)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix_))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString(prefix, out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}
out.next();
}
};
template<typename TimeT = std::chrono::milliseconds>
struct Measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
int main(int, char **)
try
{
auto execute_chain = [](String msg, size_t start1, size_t start2, size_t start3, size_t num_threads)
{
std::cerr << msg << "\n";
auto source1 = std::make_shared<NumbersSource>(start1, 3, 100000);
auto source2 = std::make_shared<NumbersSource>(start2, 3, 100000);
auto source3 = std::make_shared<NumbersSource>(start3, 3, 100000);
auto transform1 = std::make_shared<SleepyTransform>(100000);
auto transform2 = std::make_shared<SleepyTransform>(100000);
auto transform3 = std::make_shared<SleepyTransform>(100000);
auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 20, 0);
auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), 20, 0);
auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), 20, 0);
auto merge = std::make_shared<MergingSortedProcessor>(source1->getPort().getHeader(), 3);
auto limit_fin = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 54, 0);
auto sink = std::make_shared<PrintSink>("");
connect(source1->getPort(), transform1->getInputPort());
connect(source2->getPort(), transform2->getInputPort());
connect(source3->getPort(), transform3->getInputPort());
connect(transform1->getOutputPort(), limit1->getInputPort());
connect(transform2->getOutputPort(), limit2->getInputPort());
connect(transform3->getOutputPort(), limit3->getInputPort());
auto it = merge->getInputs().begin();
connect(limit1->getOutputPort(), *(it++));
connect(limit2->getOutputPort(), *(it++));
connect(limit3->getOutputPort(), *(it++));
connect(merge->getOutputPort(), limit_fin->getInputPort());
connect(limit_fin->getOutputPort(), sink->getPort());
std::vector<ProcessorPtr> processors = {source1, source2, source3,
transform1, transform2, transform3,
limit1, limit2, limit3,
merge, limit_fin, sink};
// WriteBufferFromOStream out(std::cout);
// printPipeline(processors, out);
PipelineExecutor executor(processors);
executor.execute(num_threads);
};
auto even_time_single = Measure<>::execution(execute_chain, "Even distribution single thread", 0, 1, 2, 1);
auto even_time_mt = Measure<>::execution(execute_chain, "Even distribution multiple threads", 0, 1, 2, 4);
auto half_time_single = Measure<>::execution(execute_chain, "Half distribution single thread", 0, 31, 62, 1);
auto half_time_mt = Measure<>::execution(execute_chain, "Half distribution multiple threads", 0, 31, 62, 4);
auto ordered_time_single = Measure<>::execution(execute_chain, "Ordered distribution single thread", 0, 61, 122, 1);
auto ordered_time_mt = Measure<>::execution(execute_chain, "Ordered distribution multiple threads", 0, 61, 122, 4);
std::cout << "Single Thread [0:60:3] [1:60:3] [2:60:3] time: " << even_time_single << " ms.\n";
std::cout << "Multiple Threads [0:60:3] [1:60:3] [2:60:3] time:" << even_time_mt << " ms.\n";
std::cout << "Single Thread [0:60:3] [31:90:3] [62:120:3] time: " << half_time_single << " ms.\n";
std::cout << "Multiple Threads [0:60:3] [31:90:3] [62:120:3] time: " << half_time_mt << " ms.\n";
std::cout << "Single Thread [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_single << " ms.\n";
std::cout << "Multiple Threads [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_mt << " ms.\n";
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,250 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/StoragePolicy.h>
#include <Disks/DiskLocal.h>
#include <Processors/IProcessor.h>
#include <Processors/ISource.h>
#include <Processors/ISink.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/LimitTransform.h>
#include <Processors/printPipeline.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatSettings.h>
#include <iostream>
#include <chrono>
#include <Poco/ConsoleChannel.h>
#include <Poco/AutoPtr.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class NumbersSource : public ISource
{
public:
String getName() const override { return "Numbers"; }
NumbersSource(UInt64 count_, UInt64 block_size_, unsigned sleep_useconds_)
: ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
count(count_), block_size(block_size_), sleep_useconds(sleep_useconds_)
{
}
private:
UInt64 current_number = 0;
UInt64 count;
UInt64 block_size;
unsigned sleep_useconds;
Chunk generate() override
{
if (current_number == count)
return {};
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create());
UInt64 number = current_number++;
for (UInt64 i = 0; i < block_size; ++i, number += count)
columns.back()->insert(Field(number));
return Chunk(std::move(columns), block_size);
}
};
class CheckSortedSink : public ISink
{
public:
String getName() const override { return "Print"; }
CheckSortedSink()
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}))
{
}
private:
FormatSettings settings;
UInt64 current_number = 0;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
UInt64 prev = current_number;
const auto & col = chunk.getColumns().at(0);
for (size_t row_num = 0; row_num < rows; ++row_num)
{
UInt64 val = col->getUInt(row_num);
if (val != current_number)
throw Exception("Invalid value. Expected " + toString(current_number) + ", got " + toString(val),
ErrorCodes::LOGICAL_ERROR);
++current_number;
}
std::cout << "CheckSortedSink: " << prev << " - " << current_number << std::endl;
}
};
template<typename TimeT = std::chrono::milliseconds>
struct Measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
}
using namespace DB;
int main(int, char **)
try
{
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
auto disk = std::make_shared<DiskLocal>("tmp", ".", 0);
auto tmp_volume = std::make_shared<VolumeJBOD>("tmp", std::vector<DiskPtr>{disk}, 0);
auto execute_chain = [tmp_volume](
String msg,
UInt64 source_block_size,
UInt64 blocks_count,
size_t max_merged_block_size,
UInt64 limit,
size_t max_bytes_before_remerge,
size_t max_bytes_before_external_sort,
size_t num_threads)
{
std::cerr << "------------------------\n";
std::cerr << msg << "\n";
auto source = std::make_shared<NumbersSource>(blocks_count, source_block_size, 100);
SortDescription description = {{0, 1, 1}};
auto transform = std::make_shared<MergeSortingTransform>(
source->getPort().getHeader(), description,
max_merged_block_size, limit,
max_bytes_before_remerge, max_bytes_before_external_sort,
tmp_volume, 0);
auto sink = std::make_shared<CheckSortedSink>();
connect(source->getPort(), transform->getInputs().front());
connect(transform->getOutputs().front(), sink->getPort());
std::vector<ProcessorPtr> processors = {source, transform, sink};
PipelineExecutor executor(processors);
executor.execute(num_threads);
WriteBufferFromOStream out(std::cout);
printPipeline(executor.getProcessors(), out);
};
std::map<std::string, Int64> times;
for (size_t num_threads : {1, 4})
{
{
UInt64 source_block_size = 100;
UInt64 blocks_count = 10;
size_t max_merged_block_size = 100;
UInt64 limit = 0;
size_t max_bytes_before_remerge = 10000000;
size_t max_bytes_before_external_sort = 10000000;
std::string msg = num_threads > 1 ? "multiple threads" : "single thread";
msg += ", " + toString(blocks_count) + " blocks per " + toString(source_block_size) + " numbers" +
", no remerge and external sorts.";
Int64 time = Measure<>::execution(execute_chain, msg,
source_block_size,
blocks_count,
max_merged_block_size,
limit,
max_bytes_before_remerge,
max_bytes_before_external_sort,
num_threads);
times[msg] = time;
}
{
UInt64 source_block_size = 1024;
UInt64 blocks_count = 10;
size_t max_merged_block_size = 1024;
UInt64 limit = 2048;
size_t max_bytes_before_remerge = sizeof(UInt64) * source_block_size * 4;
size_t max_bytes_before_external_sort = 10000000;
std::string msg = num_threads > 1 ? "multiple threads" : "single thread";
msg += ", " + toString(blocks_count) + " blocks per " + toString(source_block_size) + " numbers" +
", with remerge, no external sorts.";
Int64 time = Measure<>::execution(execute_chain, msg,
source_block_size,
blocks_count,
max_merged_block_size,
limit,
max_bytes_before_remerge,
max_bytes_before_external_sort,
num_threads);
times[msg] = time;
}
{
UInt64 source_block_size = 1024;
UInt64 blocks_count = 10;
size_t max_merged_block_size = 1024;
UInt64 limit = 0;
size_t max_bytes_before_remerge = 0;
size_t max_bytes_before_external_sort = sizeof(UInt64) * source_block_size * 4;
std::string msg = num_threads > 1 ? "multiple threads" : "single thread";
msg += ", " + toString(blocks_count) + " blocks per " + toString(source_block_size) + " numbers" +
", no remerge, with external sorts.";
Int64 time = Measure<>::execution(execute_chain, msg,
source_block_size,
blocks_count,
max_merged_block_size,
limit,
max_bytes_before_remerge,
max_bytes_before_external_sort,
num_threads);
times[msg] = time;
}
}
for (auto & item : times)
std::cout << item.first << ' ' << item.second << " ms.\n";
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,207 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/IProcessor.h>
#include <Processors/ISource.h>
#include <Processors/ISink.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/LimitTransform.h>
#include <Processors/printPipeline.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatSettings.h>
#include <iostream>
#include <chrono>
using namespace DB;
class NumbersSource : public ISource
{
public:
String getName() const override { return "Numbers"; }
NumbersSource(UInt64 start_number, UInt64 step_, UInt64 block_size_, unsigned sleep_useconds_)
: ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
current_number(start_number), step(step_), block_size(block_size_), sleep_useconds(sleep_useconds_)
{
}
private:
UInt64 current_number = 0;
UInt64 step;
UInt64 block_size;
unsigned sleep_useconds;
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create());
for (UInt64 i = 0; i < block_size; ++i, current_number += step)
columns.back()->insert(Field(current_number));
return Chunk(std::move(columns), block_size);
}
};
class SleepyTransform : public ISimpleTransform
{
public:
explicit SleepyTransform(unsigned sleep_useconds_)
: ISimpleTransform(
Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
false)
, sleep_useconds(sleep_useconds_) {}
String getName() const override { return "SleepyTransform"; }
protected:
void transform(Chunk &) override
{
usleep(sleep_useconds);
}
private:
unsigned sleep_useconds;
};
class PrintSink : public ISink
{
public:
String getName() const override { return "Print"; }
explicit PrintSink(String prefix_)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix_))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString(prefix, out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}
out.next();
}
};
template<typename TimeT = std::chrono::milliseconds>
struct Measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
int main(int, char **)
try
{
auto execute_chain = [](String msg, size_t start1, size_t start2, size_t start3, size_t num_threads)
{
std::cerr << msg << "\n";
auto source1 = std::make_shared<NumbersSource>(start1, 3, 2, 100000);
auto source2 = std::make_shared<NumbersSource>(start2, 3, 2, 100000);
auto source3 = std::make_shared<NumbersSource>(start3, 3, 2, 100000);
auto transform1 = std::make_shared<SleepyTransform>(100000);
auto transform2 = std::make_shared<SleepyTransform>(100000);
auto transform3 = std::make_shared<SleepyTransform>(100000);
auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 20, 0);
auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), 20, 0);
auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), 20, 0);
SortDescription description = {{0, 1, 1}};
auto merge = std::make_shared<MergingSortedTransform>(source1->getPort().getHeader(), 3, description, 2);
auto limit_fin = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 54, 0);
auto sink = std::make_shared<PrintSink>("");
connect(source1->getPort(), transform1->getInputPort());
connect(source2->getPort(), transform2->getInputPort());
connect(source3->getPort(), transform3->getInputPort());
connect(transform1->getOutputPort(), limit1->getInputPort());
connect(transform2->getOutputPort(), limit2->getInputPort());
connect(transform3->getOutputPort(), limit3->getInputPort());
auto it = merge->getInputs().begin();
connect(limit1->getOutputPort(), *(it++));
connect(limit2->getOutputPort(), *(it++));
connect(limit3->getOutputPort(), *(it++));
connect(merge->getOutputs().front(), limit_fin->getInputPort());
connect(limit_fin->getOutputPort(), sink->getPort());
std::vector<ProcessorPtr> processors = {source1, source2, source3,
transform1, transform2, transform3,
limit1, limit2, limit3,
merge, limit_fin, sink};
// WriteBufferFromOStream out(std::cout);
// printPipeline(processors, out);
PipelineExecutor executor(processors);
executor.execute(num_threads);
};
auto even_time_single = Measure<>::execution(execute_chain, "Even distribution single thread", 0, 1, 2, 1);
auto even_time_mt = Measure<>::execution(execute_chain, "Even distribution multiple threads", 0, 1, 2, 4);
auto half_time_single = Measure<>::execution(execute_chain, "Half distribution single thread", 0, 31, 62, 1);
auto half_time_mt = Measure<>::execution(execute_chain, "Half distribution multiple threads", 0, 31, 62, 4);
auto ordered_time_single = Measure<>::execution(execute_chain, "Ordered distribution single thread", 0, 61, 122, 1);
auto ordered_time_mt = Measure<>::execution(execute_chain, "Ordered distribution multiple threads", 0, 61, 122, 4);
std::cout << "Single Thread [0:60:3] [1:60:3] [2:60:3] time: " << even_time_single << " ms.\n";
std::cout << "Multiple Threads [0:60:3] [1:60:3] [2:60:3] time:" << even_time_mt << " ms.\n";
std::cout << "Single Thread [0:60:3] [31:90:3] [62:120:3] time: " << half_time_single << " ms.\n";
std::cout << "Multiple Threads [0:60:3] [31:90:3] [62:120:3] time: " << half_time_mt << " ms.\n";
std::cout << "Single Thread [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_single << " ms.\n";
std::cout << "Multiple Threads [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_mt << " ms.\n";
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,9 +1,3 @@
add_executable (system_numbers system_numbers.cpp)
target_link_libraries (system_numbers PRIVATE dbms clickhouse_storages_system clickhouse_common_io)
add_executable (storage_log storage_log.cpp)
target_link_libraries (storage_log PRIVATE dbms)
add_executable (part_name part_name.cpp) add_executable (part_name part_name.cpp)
target_link_libraries (part_name PRIVATE dbms) target_link_libraries (part_name PRIVATE dbms)

View File

@ -1,113 +0,0 @@
#include <iostream>
#include <IO/WriteBufferFromOStream.h>
#include <Storages/StorageLog.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
#include <Disks/DiskLocal.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
int main(int, char **)
try
{
using namespace DB;
const size_t rows = 10000000;
/// create table with a pair of columns
NamesAndTypesList names_and_types;
names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
names_and_types.emplace_back("b", std::make_shared<DataTypeUInt8>());
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
context.setPath("./");
DiskPtr disk = std::make_unique<DiskLocal>("default", "./", 0);
StoragePtr table = StorageLog::create(disk, "table/", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
table->startup();
/// write into it
{
Block block;
{
ColumnWithTypeAndName column;
column.name = "a";
column.type = table->getColumns().getPhysical("a").type;
auto col = column.type->createColumn();
ColumnUInt64::Container & vec = typeid_cast<ColumnUInt64 &>(*col).getData();
vec.resize(rows);
for (size_t i = 0; i < rows; ++i)
vec[i] = i;
column.column = std::move(col);
block.insert(column);
}
{
ColumnWithTypeAndName column;
column.name = "b";
column.type = table->getColumns().getPhysical("b").type;
auto col = column.type->createColumn();
ColumnUInt8::Container & vec = typeid_cast<ColumnUInt8 &>(*col).getData();
vec.resize(rows);
for (size_t i = 0; i < rows; ++i)
vec[i] = i * 2;
column.column = std::move(col);
block.insert(column);
}
BlockOutputStreamPtr out = table->write({}, context);
out->write(block);
}
/// read from it
{
Names column_names;
column_names.push_back("a");
column_names.push_back("b");
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, context, stage, 8192, 1)[0]));
Block sample;
{
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeUInt64>();
sample.insert(std::move(col));
}
{
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeUInt8>();
sample.insert(std::move(col));
}
WriteBufferFromOStream out_buf(std::cout);
LimitBlockInputStream in_limit(in, 10, 0);
BlockOutputStreamPtr output = FormatFactory::instance().getOutput("TabSeparated", out_buf, sample, context);
copyData(in_limit, *output);
}
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}

View File

@ -1,47 +0,0 @@
#include <iostream>
#include <IO/WriteBufferFromOStream.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
int main(int, char **)
try
{
using namespace DB;
StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false);
Names column_names;
column_names.push_back("number");
Block sample;
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeUInt64>();
sample.insert(std::move(col));
WriteBufferFromOStream out_buf(std::cout);
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
auto stream = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, context, stage, 10, 1)[0]));
LimitBlockInputStream input(stream, 10, 96);
BlockOutputStreamPtr out = FormatFactory::instance().getOutput("TabSeparated", out_buf, sample, context);
copyData(input, *out);
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}

View File

@ -1,28 +0,0 @@
create database if not exists test;
drop table if exists test.merge_distributed;
drop table if exists test.merge_distributed1;
create table test.merge_distributed1 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into test.merge_distributed1 values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
create table test.merge_distributed ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = Distributed(self, test, merge_distributed1);
alter table test.merge_distributed1 add column dummy String after CounterID;
alter table test.merge_distributed add column dummy String after CounterID;
describe table test.merge_distributed;
show create table test.merge_distributed;
insert into test.merge_distributed1 values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
select CounterID, dummy from test.merge_distributed where dummy <> '' limit 10;
alter table test.merge_distributed drop column dummy;
describe table test.merge_distributed;
show create table test.merge_distributed;
--error: should fall, because there is no `dummy1` column
alter table test.merge_distributed add column dummy1 String after CounterID;
select CounterID, dummy1 from test.merge_distributed where dummy1 <> '' limit 10;

View File

@ -1,35 +0,0 @@
create database if not exists test;
drop table if exists test.merge;
drop table if exists test.merge1;
drop table if exists test.merge2;
create table test.merge1 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into test.merge1 values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
create table test.merge2 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into test.merge2 values (2, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
create table test.merge ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = Merge(test, 'merge\[0-9\]');
alter table test.merge1 add column dummy String after CounterID;
alter table test.merge2 add column dummy String after CounterID;
alter table test.merge add column dummy String after CounterID;
describe table test.merge;
show create table test.merge;
insert into test.merge1 values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
select CounterID, dummy from test.merge where dummy <> '' limit 10;
alter table test.merge drop column dummy;
describe table test.merge;
show create table test.merge;
--error: must correctly fall into the alter
alter table test.merge add column dummy1 String after CounterID;
select CounterID, dummy1 from test.merge where dummy1 <> '' limit 10;

View File

@ -1,17 +0,0 @@
create database if not exists test;
drop table if exists test.merge_tree;
create table test.merge_tree ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into test.merge_tree values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3)
alter table test.merge_tree add column dummy String after CounterID;
describe table test.merge_tree;
insert into test.merge_tree values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3)
select CounterID, dummy from test.merge_tree where dummy <> '' limit 10;
alter table test.merge_tree drop column dummy;
describe table test.merge_tree;

View File

@ -0,0 +1,18 @@
CounterID UInt32
dummy String
StartDate Date
Sign Int8
VisitID UInt64
UserID UInt64
StartTime DateTime
ClickLogID UInt64
CREATE TABLE default.merge_distributed\n(\n `CounterID` UInt32, \n `dummy` String, \n `StartDate` Date, \n `Sign` Int8, \n `VisitID` UInt64, \n `UserID` UInt64, \n `StartTime` DateTime, \n `ClickLogID` UInt64\n)\nENGINE = Distributed(\'test_shard_localhost\', \'default\', \'merge_distributed1\')
1 Hello, Alter Table!
CounterID UInt32
StartDate Date
Sign Int8
VisitID UInt64
UserID UInt64
StartTime DateTime
ClickLogID UInt64
CREATE TABLE default.merge_distributed\n(\n `CounterID` UInt32, \n `StartDate` Date, \n `Sign` Int8, \n `VisitID` UInt64, \n `UserID` UInt64, \n `StartTime` DateTime, \n `ClickLogID` UInt64\n)\nENGINE = Distributed(\'test_shard_localhost\', \'default\', \'merge_distributed1\')

View File

@ -0,0 +1,28 @@
drop table if exists merge_distributed;
drop table if exists merge_distributed1;
create table merge_distributed1 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into merge_distributed1 values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
create table merge_distributed ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = Distributed(test_shard_localhost, currentDatabase(), merge_distributed1);
alter table merge_distributed1 add column dummy String after CounterID;
alter table merge_distributed add column dummy String after CounterID;
describe table merge_distributed;
show create table merge_distributed;
insert into merge_distributed1 values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
select CounterID, dummy from merge_distributed where dummy <> '' limit 10;
alter table merge_distributed drop column dummy;
describe table merge_distributed;
show create table merge_distributed;
--error: should fall, because there is no `dummy1` column
alter table merge_distributed add column dummy1 String after CounterID;
select CounterID, dummy1 from merge_distributed where dummy1 <> '' limit 10; -- { serverError 47 }
drop table merge_distributed;
drop table merge_distributed1;

View File

@ -0,0 +1,17 @@
CounterID UInt32
dummy String
StartDate Date
Sign Int8
VisitID UInt64
UserID UInt64
StartTime DateTime
ClickLogID UInt64
CREATE TABLE default.merge\n(\n `CounterID` UInt32, \n `dummy` String, \n `StartDate` Date, \n `Sign` Int8, \n `VisitID` UInt64, \n `UserID` UInt64, \n `StartTime` DateTime, \n `ClickLogID` UInt64\n)\nENGINE = Merge(\'default\', \'merge\\\\[0-9\\\\]\')
CounterID UInt32
StartDate Date
Sign Int8
VisitID UInt64
UserID UInt64
StartTime DateTime
ClickLogID UInt64
CREATE TABLE default.merge\n(\n `CounterID` UInt32, \n `StartDate` Date, \n `Sign` Int8, \n `VisitID` UInt64, \n `UserID` UInt64, \n `StartTime` DateTime, \n `ClickLogID` UInt64\n)\nENGINE = Merge(\'default\', \'merge\\\\[0-9\\\\]\')

View File

@ -0,0 +1,36 @@
drop table if exists merge;
drop table if exists merge1;
drop table if exists merge2;
create table merge1 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into merge1 values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
create table merge2 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into merge2 values (2, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
create table merge ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = Merge(currentDatabase(), 'merge\[0-9\]');
alter table merge1 add column dummy String after CounterID;
alter table merge2 add column dummy String after CounterID;
alter table merge add column dummy String after CounterID;
describe table merge;
show create table merge;
insert into merge1 values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
select CounterID, dummy from merge where dummy <> '' limit 10;
alter table merge drop column dummy;
describe table merge;
show create table merge;
--error: must correctly fall into the alter
alter table merge add column dummy1 String after CounterID;
select CounterID, dummy1 from merge where dummy1 <> '' limit 10;
drop table merge;
drop table merge1;
drop table merge2;

View File

@ -0,0 +1,16 @@
CounterID UInt32
dummy String
StartDate Date
Sign Int8
VisitID UInt64
UserID UInt64
StartTime DateTime
ClickLogID UInt64
1 Hello, Alter Table!
CounterID UInt32
StartDate Date
Sign Int8
VisitID UInt64
UserID UInt64
StartTime DateTime
ClickLogID UInt64

View File

@ -0,0 +1,17 @@
drop table if exists merge_tree;
create table merge_tree ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
insert into merge_tree values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3)
alter table merge_tree add column dummy String after CounterID;
describe table merge_tree;
insert into merge_tree values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3)
select CounterID, dummy from merge_tree where dummy <> '' limit 10;
alter table merge_tree drop column dummy;
describe table merge_tree;
drop table merge_tree;