dbms: compiled aggregator: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-01-10 05:30:03 +03:00
parent 6673251a20
commit a7a2712630
19 changed files with 450 additions and 275 deletions

View File

@ -8,7 +8,7 @@
SOURCE_PATH=${1:-.}
DST=${2:-$SOURCE_PATH/../headers};
for i in $(/usr/bin/c++ -M -xc++ -std=gnu++1y -Wall -Werror -march=native -O3 -g -shared -fPIC -rdynamic \
for i in $(clang -M -xc++ -std=gnu++1y -Wall -Werror -march=native -O3 -g -fPIC \
$(cat $SOURCE_PATH/CMakeLists.txt | grep include_directories | grep -v METRICA_BINARY_DIR | sed -e "s!\${METRICA_SOURCE_DIR}!$SOURCE_PATH!; s!include_directories (!-I !; s!)!!;" | tr '\n' ' ') \
$SOURCE_PATH/dbms/include/DB/Interpreters/SpecializedAggregator.h |
tr -d '\\' |

View File

@ -1,3 +1,5 @@
#pragma once
#include <DB/Common/HashTable/Hash.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>

View File

@ -132,7 +132,7 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
namespace std
{
template<> inline void swap<DB::Block>(DB::Block & one, DB::Block & another) noexcept
template<> inline void swap<DB::Block>(DB::Block & one, DB::Block & another)
{
one.swap(another);
}

View File

@ -19,8 +19,9 @@ class AggregatingBlockInputStream : public IProfilingBlockInputStream
{
public:
AggregatingBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_, AggregateDescriptions & aggregates_,
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_)
: aggregator(new Aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_)),
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_)
: aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_, compiler_, min_count_to_compile_),
final(final_)
{
children.push_back(input_);
@ -31,21 +32,27 @@ public:
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/
AggregatingBlockInputStream(BlockInputStreamPtr input_, const Names & key_names, const AggregateDescriptions & aggregates,
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_);
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_)
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_, compiler_, min_count_to_compile_),
final(final_)
{
children.push_back(input_);
}
String getName() const override { return "AggregatingBlockInputStream"; }
String getID() const override
{
std::stringstream res;
res << "Aggregating(" << children.back()->getID() << ", " << aggregator->getID() << ")";
res << "Aggregating(" << children.back()->getID() << ", " << aggregator.getID() << ")";
return res.str();
}
protected:
Block readImpl() override;
SharedPtr<Aggregator> aggregator;
Aggregator aggregator;
bool final;
bool executed = false;

View File

@ -18,14 +18,16 @@ class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream
public:
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(new Aggregator(keys_, aggregates_, overflow_row_)), final(final_), max_threads(max_threads_)
: aggregator(keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0),
final(final_), max_threads(max_threads_)
{
children.push_back(input_);
}
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(new Aggregator(keys_names_, aggregates_, overflow_row_)), final(final_), max_threads(max_threads_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0),
final(final_), max_threads(max_threads_)
{
children.push_back(input_);
}
@ -35,7 +37,7 @@ public:
String getID() const override
{
std::stringstream res;
res << "MergingAggregated(" << children.back()->getID() << ", " << aggregator->getID() << ")";
res << "MergingAggregated(" << children.back()->getID() << ", " << aggregator.getID() << ")";
return res.str();
}
@ -43,7 +45,7 @@ protected:
Block readImpl() override;
private:
SharedPtr<Aggregator> aggregator;
Aggregator aggregator;
bool final;
size_t max_threads;

View File

@ -21,8 +21,9 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
public:
ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const ColumnNumbers & keys_,
AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_),
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_)
: aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_, compiler_, min_count_to_compile_),
final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(keys_.size()), aggregates_size(aggregates_.size()),
handler(*this), processor(inputs, max_threads, handler)
@ -34,8 +35,9 @@ public:
*/
ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const Names & key_names,
const AggregateDescriptions & aggregates, bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_),
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_)
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_, compiler_, min_count_to_compile_),
final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(key_names.size()), aggregates_size(aggregates.size()),
handler(*this), processor(inputs, max_threads, handler)

View File

@ -15,6 +15,7 @@
#include <DB/Interpreters/AggregateDescription.h>
#include <DB/Interpreters/AggregationCommon.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/Compiler.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
@ -606,10 +607,11 @@ class Aggregator
{
public:
Aggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_)
: keys(keys_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_)
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_)
{
std::sort(keys.begin(), keys.end());
keys.erase(std::unique(keys.begin(), keys.end()), keys.end());
@ -617,10 +619,11 @@ public:
}
Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_)
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_)
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_)
{
std::sort(key_names.begin(), key_names.end());
key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end());
@ -630,8 +633,9 @@ public:
/// Агрегировать источник. Получить результат в виде одной из структур данных.
void execute(BlockInputStreamPtr stream, AggregatedDataVariants & result);
typedef std::vector<ConstColumnPlainPtrs> AggregateColumns;
typedef std::vector<ColumnAggregateFunction::Container_t *> AggregateColumnsData;
using AggregateColumns = std::vector<ConstColumnPlainPtrs>;
using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container_t *>;
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;
/// Обработать один блок. Вернуть false, если обработку следует прервать (при group_by_overflow_mode = 'break').
bool executeOnBlock(Block & block, AggregatedDataVariants & result,
@ -669,7 +673,7 @@ protected:
ColumnNumbers keys;
Names key_names;
AggregateDescriptions aggregates;
std::vector<IAggregateFunction *> aggregate_functions;
AggregateFunctionsPlainPtrs aggregate_functions;
size_t keys_size;
size_t aggregates_size;
/// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by.
@ -690,6 +694,18 @@ protected:
Logger * log = &Logger::get("Aggregator");
/** Динамически скомпилированная библиотека для агрегации, если есть.
*/
Compiler * compiler = nullptr;
UInt32 min_count_to_compile;
Compiler::SharedLibraryPtr compiled_aggregator;
const void * compiled_method_ptr = nullptr;
bool compiled_if_possible = false;
void compileIfPossible(AggregatedDataVariants::Type type);
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.
*/

View File

@ -1,7 +1,6 @@
#pragma once
#include <dlfcn.h>
#include <stdio.h>
#include <string>
#include <mutex>
@ -9,22 +8,12 @@
#include <unordered_set>
#include <unordered_map>
#include <Poco/DirectoryIterator.h>
#include <Yandex/Revision.h>
#include <Yandex/logger_useful.h>
#include <statdaemons/threadpool.hpp>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Common/SipHash.h>
#include <DB/Common/UInt128.h>
#include <DB/IO/Operators.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/ReadBufferFromFileDescriptor.h>
#include <DB/IO/copyData.h>
#include <DB/IO/WriteBufferFromFile.h>
namespace DB
@ -79,27 +68,11 @@ public:
* Результаты компиляции сохраняются при перезапуске сервера,
* но используют в качестве части ключа номер ревизии. То есть, устаревают при обновлении сервера.
*/
Compiler(const std::string & path_, size_t threads)
: path(path_), pool(threads)
{
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
{
std::string name = dir_it.name();
if (name.length() > strlen(".so") && 0 == name.compare(name.size() - 3, 3, ".so"))
{
files.insert(name.substr(0, name.size() - 3));
}
}
Compiler(const std::string & path_, size_t threads);
~Compiler();
LOG_INFO(log, "Having " << files.size() << " compiled files from previous start.");
}
~Compiler()
{
LOG_DEBUG(log, "Waiting for threads to finish.");
pool.wait();
}
using HashedKey = UInt128;
using SharedLibraryPtr = std::shared_ptr<SharedLibrary>;
/** Увеличить счётчик для заданного ключа key на единицу.
* Если результат компиляции уже есть (уже открыт, или есть файл с библиотекой),
@ -108,67 +81,14 @@ public:
* инициировать компиляцию в отдельном потоке, если есть свободные потоки, и вернуть nullptr.
* Иначе вернуть nullptr.
*/
std::shared_ptr<SharedLibrary> getOrCount(
SharedLibraryPtr getOrCount(
const std::string & key,
UInt32 min_count_to_compile,
std::function<std::string()> get_code)
{
HashedKey hashed_key = getHash(key);
std::lock_guard<std::mutex> lock(mutex);
UInt32 count = ++counts[hashed_key];
/// Есть готовая открытая библиотека? Или, если библиотека в процессе компиляции, там будет nullptr.
Libraries::iterator it = libraries.find(hashed_key);
if (libraries.end() != it)
{
if (!it->second)
LOG_INFO(log, "Library " << hashedKeyToFileName(hashed_key) << " is compiling.");
return it->second;
}
/// Есть файл с библиотекой, оставшийся от предыдущего запуска?
std::string file_name = hashedKeyToFileName(hashed_key);
if (files.count(file_name))
return libraries.emplace(std::piecewise_construct,
std::forward_as_tuple(hashed_key),
std::forward_as_tuple(new SharedLibrary(path + '/' + file_name + ".so"))).first->second;
/// Достигнуто ли min_count_to_compile?
if (count >= min_count_to_compile)
{
/// Есть ли свободные потоки.
if (pool.active() < pool.size())
{
/// Обозначает, что библиотека в процессе компиляции.
libraries[hashed_key] = nullptr;
pool.schedule([=]
{
try
{
compile(hashed_key, file_name, get_code);
}
catch (...)
{
tryLogCurrentException("Compiler");
}
});
}
else
LOG_INFO(log, "All threads are busy.");
}
return nullptr;
}
std::function<std::string()> get_code);
private:
using HashedKey = UInt128;
using Counts = std::unordered_map<HashedKey, UInt32, UInt128Hash>;
using Libraries = std::unordered_map<HashedKey, std::shared_ptr<SharedLibrary>, UInt128Hash>;
using Libraries = std::unordered_map<HashedKey, SharedLibraryPtr, UInt128Hash>;
using Files = std::unordered_set<std::string>;
const std::string path;
@ -188,137 +108,7 @@ private:
Logger * log = &Logger::get("Compiler");
static HashedKey getHash(const std::string & key)
{
SipHash hash;
auto revision = Revision::get();
hash.update(reinterpret_cast<const char *>(&revision), sizeof(revision));
hash.update(key.data(), key.size());
HashedKey res;
hash.get128(res.first, res.second);
return res;
}
/// Без расширения .so.
static std::string hashedKeyToFileName(HashedKey hashed_key)
{
std::string file_name;
{
WriteBufferFromString out(file_name);
out << hashed_key.first << '_' << hashed_key.second;
}
return file_name;
}
static HashedKey fileNameToHashedKey(const std::string & file_name)
{
HashedKey hashed_key;
{
ReadBufferFromString in(file_name);
in >> hashed_key.first >> "_" >> hashed_key.second;
}
return hashed_key;
}
struct Pipe : private boost::noncopyable
{
FILE * f;
Pipe(const std::string & command)
{
errno = 0;
f = popen(command.c_str(), "r");
if (!f)
throwFromErrno("Cannot popen");
}
~Pipe()
{
try
{
errno = 0;
if (f && -1 == pclose(f))
throwFromErrno("Cannot pclose");
}
catch (...)
{
tryLogCurrentException("Pipe");
}
}
};
void compile(HashedKey hashed_key, std::string file_name, std::function<std::string()> get_code)
{
LOG_INFO(log, "Compiling code " << file_name);
std::string prefix = path + "/" + file_name;
std::string cpp_file_path = prefix + ".cpp";
std::string so_file_path = prefix + ".so";
{
WriteBufferFromFile out(cpp_file_path);
out << get_code();
}
std::stringstream command;
/// Слегка неудобно.
command <<
"/usr/share/clickhouse/bin/clang"
" -x c++ -std=gnu++11 -O3 -g -Wall -Werror -Wnon-virtual-dtor -march=native -D NDEBUG"
" -shared -fPIC -fvisibility=hidden -fno-implement-inlines"
" -isystem /usr/share/clickhouse/headers/usr/local/include/"
" -isystem /usr/share/clickhouse/headers/usr/include/"
" -isystem /usr/share/clickhouse/headers/usr/include/c++/4.8/"
" -isystem /usr/share/clickhouse/headers/usr/include/x86_64-linux-gnu/c++/4.8/"
" -isystem /usr/share/clickhouse/headers/usr/lib/gcc/x86_64-linux-gnu/4.8/include/"
" -I /usr/share/clickhouse/headers/dbms/include/"
" -I /usr/share/clickhouse/headers/libs/libcityhash/"
" -I /usr/share/clickhouse/headers/libs/libcommon/include/"
" -I /usr/share/clickhouse/headers/libs/libdouble-conversion/src/"
" -I /usr/share/clickhouse/headers/libs/libmysqlxx/include/"
" -I /usr/share/clickhouse/headers/libs/libstatdaemons/include/"
" -I /usr/share/clickhouse/headers/libs/libstats/include/"
" -o " << so_file_path << " " << cpp_file_path
<< " 2>&1 || echo Exit code: $?";
std::string compile_result;
{
Pipe pipe(command.str());
int pipe_fd = fileno(pipe.f);
if (-1 == pipe_fd)
throwFromErrno("Cannot fileno");
{
ReadBufferFromFileDescriptor command_output(pipe_fd);
WriteBufferFromString res(compile_result);
copyData(command_output, res);
}
}
if (!compile_result.empty())
throw Exception("Cannot compile code:\n\n" + command.str() + "\n\n" + compile_result);
/// Если до этого была ошибка, то файл с кодом остаётся для возможности просмотра.
Poco::File(cpp_file_path).remove();
{
std::lock_guard<std::mutex> lock(mutex);
libraries[hashed_key].reset(new SharedLibrary(so_file_path));
}
LOG_INFO(log, "Compiled code " << file_name);
}
void compile(HashedKey hashed_key, std::string file_name, std::function<std::string()> get_code);
};
}

View File

@ -27,6 +27,7 @@
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/Interpreters/Compiler.h>
#include <DB/Client/ConnectionPool.h>
#include <statdaemons/ConfigProcessor.h>
#include <zkutil/ZooKeeper.h>
@ -99,6 +100,7 @@ struct ContextShared
InterserverIOHandler interserver_io_handler; /// Обработчик для межсерверной передачи данных.
BackgroundProcessingPoolPtr background_pool; /// Пул потоков для фоновой работы, выполняемой таблицами.
Macros macros; /// Подстановки из конфига.
Compiler compiler { path + "build/", 1 }; /// Для динамической компиляции частей запроса, при необходимости.
/// Кластеры для distributed таблиц
/// Создаются при создании Distributed таблиц, так как нужно дождаться пока будут выставлены Settings
@ -334,6 +336,8 @@ public:
void initClusters();
Cluster & getCluster(const std::string & cluster_name);
Compiler & getCompiler();
void shutdown() { shared->shutdown(); }
};

View File

@ -86,6 +86,11 @@ struct Settings
\
/** Сэмплирование по умолчанию. Если равно 1, то отключено. */ \
M(SettingFloat, default_sample, 1.0) \
\
/** Включена ли компиляция запросов. */ \
M(SettingBool, compile, false) \
/** Количество одинаковых по структуре запросов перед тем, как инициируется их компиляция. */ \
M(SettingUInt64, min_count_to_compile, 0) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -34,16 +34,18 @@ class SplittingAggregator : private Aggregator
{
public:
SplittingAggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, size_t threads_,
bool with_totals_, size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: Aggregator(keys_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_), threads(threads_), pool(threads),
bool with_totals_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_)
: Aggregator(keys_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_, nullptr, 0),
threads(threads_), pool(threads),
log(&Logger::get("SplittingAggregator")), method(AggregatedDataVariants::Type::EMPTY),
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0), size_of_all_results(0)
{
}
SplittingAggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, size_t threads_,
bool with_totals_, size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: Aggregator(key_names_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_), threads(threads_), pool(threads),
bool with_totals_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_)
: Aggregator(key_names_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_, nullptr, 0),
threads(threads_), pool(threads),
log(&Logger::get("SplittingAggregator")), method(AggregatedDataVariants::Type::EMPTY),
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0), size_of_all_results(0)
{

View File

@ -7,26 +7,14 @@ namespace DB
{
AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_,
const Names & key_names, const AggregateDescriptions & aggregates,
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_)
: final(final_)
{
children.push_back(input_);
aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_);
}
Block AggregatingBlockInputStream::readImpl()
{
if (!executed)
{
executed = true;
AggregatedDataVariants data_variants;
aggregator->execute(children.back(), data_variants);
blocks = aggregator->convertToBlocks(data_variants, final, 1);
aggregator.execute(children.back(), data_variants);
blocks = aggregator.convertToBlocks(data_variants, final, 1);
it = blocks.begin();
}

View File

@ -13,8 +13,8 @@ Block MergingAggregatedBlockInputStream::readImpl()
{
executed = true;
AggregatedDataVariants data_variants;
aggregator->mergeStream(children.back(), data_variants, max_threads);
blocks = aggregator->convertToBlocks(data_variants, final, max_threads);
aggregator.mergeStream(children.back(), data_variants, max_threads);
blocks = aggregator.convertToBlocks(data_variants, final, max_threads);
it = blocks.begin();
}

View File

@ -91,7 +91,8 @@ int main(int argc, char ** argv)
}
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block);
stream = new DB::AggregatingBlockInputStream(stream, key_column_numbers, aggregate_descriptions, false, true, 0, DB::OverflowMode::THROW);
stream = new DB::AggregatingBlockInputStream(stream, key_column_numbers, aggregate_descriptions, false, true,
0, DB::OverflowMode::THROW, nullptr);
DB::WriteBufferFromOStream ob(std::cout);
DB::RowOutputStreamPtr row_out = new DB::TabSeparatedRowOutputStream(ob, sample);

View File

@ -127,6 +127,97 @@ void Aggregator::initialize(Block & block)
}
void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
{
std::lock_guard<std::mutex> lock(mutex);
if (compiled_if_possible)
return;
compiled_if_possible = true;
std::string method_typename;
#define M(NAME, IS_TWO_LEVEL) \
else if (type == AggregatedDataVariants::Type::NAME) \
method_typename = "decltype(AggregatedDataVariants::" ## NAME ## ")::element_type";
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else if (type != AggregatedDataVariants::Type::without_key) /// TODO Реализовать для without_key
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
/// Список типов агрегатных функций.
std::stringstream aggregate_functions_typenames_str;
for (size_t i = 0; i < aggregates_size; ++i)
{
int status = 0;
char * type_name_ptr = abi::__cxa_demangle(typeid(*aggregate_functions[i]).name(), 0, 0, &status);
std::string type_name = type_name_ptr;
free(type_name_ptr);
if (status)
throw Exception("Cannot compile code: cannot demangle name " + String(typeid(*aggregate_functions[i]).name())
+ ", status: " + toString(status)/* TODO ErrorCodes*/);
aggregate_functions_typenames_str << ((i != 0) ? ", " : "") << type_name;
}
std::string aggregate_functions_typenames = aggregate_functions_typenames_str.str();
std::string key = "Aggregate: " + method_typename + ", " + aggregate_functions_typenames;
auto get_code = [method_typename, aggregate_functions_typenames]
{
/// Короткий кусок кода, представляющий собой явное инстанцирование шаблона.
std::stringstream code;
code <<
"#include <DB/Interpreters/SpecializedAggregator.h>\n"
"\n"
"namespace DB\n"
"{\n"
"\n"
"template void Aggregator::executeSpecialized<\n"
"\t" << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
"\t" << method_typename << " &, Arena *, size_t, ConstColumnPlainPtrs &,\n"
"\tAggregateColumns &, const Sizes &, StringRefs &, bool, AggregateDataPtr) const;\n"
"\n"
"static void wrapper(\n"
"\tconst Aggregator & aggregator,\n"
"\t" << method_typename << " & method,\n"
"\tArena * arena,\n"
"\tsize_t rows,\n"
"\tConstColumnPlainPtrs & key_columns,\n"
"\tAggregator::AggregateColumns & aggregate_columns,\n"
"\tconst Sizes & key_sizes,\n"
"\tStringRefs & keys,\n"
"\tbool no_more_keys,\n"
"\tAggregateDataPtr overflow_row)\n"
"{\n"
"\taggregator.executeSpecialized<\n"
"\t\t" << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
"\t\tmethod, arena, rows, key_columns, aggregate_columns, key_sizes, keys, no_more_keys, overflow_row);\n"
"}\n"
"\n"
"const void * getPtr() __attribute__((__visibility__(\"default\")));\n"
"const void * getPtr()\n" /// Без этой обёртки непонятно, как достать нужный символ из скомпилированной библиотеки.
"{\n"
"\treturn reinterpret_cast<const void *>(&wrapper);\n"
"}\n"
"\n"
"}\n";
return code.str();
};
compiled_aggregator = compiler->getOrCount(key, min_count_to_compile, get_code);
if (compiled_aggregator)
compiled_method_ptr = compiled_aggregator->get<const void * (*) ()>("_ZN2DB6getPtrEv")();
}
AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes)
{
bool keys_fit_128_bits = true;
@ -352,6 +443,9 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
result.keys_size = keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
if (compiler)
compileIfPossible(result.type);
}
if ((overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
@ -384,16 +478,36 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
AggregateDataPtr overflow_row_ptr = overflow_row ? result.without_key : nullptr;
#define M(NAME, IS_TWO_LEVEL) \
if (compiled_method_ptr) /// NOTE Можно динамически начинать применять компилированный агрегатор, когда он станет готов, во время запроса.
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
reinterpret_cast<void (*)( \
const Aggregator &, decltype(result.NAME)::element_type &, \
Arena *, size_t, ConstColumnPlainPtrs &, AggregateColumns &, \
const Sizes &, StringRefs &, bool, AggregateDataPtr)>(compiled_method_ptr) \
(*this, *result.NAME, result.aggregates_pool, rows, key_columns, aggregate_columns, \
result.key_sizes, key, no_more_keys, overflow_row_ptr);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else if (result.type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
else
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, aggregate_columns, \
result.key_sizes, key, no_more_keys, overflow_row_ptr);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
#undef M
else if (result.type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
size_t result_size = result.size();

View File

@ -0,0 +1,230 @@
#include <stdio.h>
#include <Poco/DirectoryIterator.h>
#include <Yandex/Revision.h>
#include <DB/Common/SipHash.h>
#include <DB/IO/Operators.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/ReadBufferFromFileDescriptor.h>
#include <DB/IO/copyData.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/Interpreters/Compiler.h>
namespace DB
{
Compiler::Compiler(const std::string & path_, size_t threads)
: path(path_), pool(threads)
{
Poco::File(path).createDirectory();
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
{
std::string name = dir_it.name();
if (name.length() > strlen(".so") && 0 == name.compare(name.size() - 3, 3, ".so"))
{
files.insert(name.substr(0, name.size() - 3));
}
}
LOG_INFO(log, "Having " << files.size() << " compiled files from previous start.");
}
Compiler::~Compiler()
{
LOG_DEBUG(log, "Waiting for threads to finish.");
pool.wait();
}
static Compiler::HashedKey getHash(const std::string & key)
{
SipHash hash;
auto revision = Revision::get();
hash.update(reinterpret_cast<const char *>(&revision), sizeof(revision));
hash.update(key.data(), key.size());
Compiler::HashedKey res;
hash.get128(res.first, res.second);
return res;
}
/// Без расширения .so.
static std::string hashedKeyToFileName(Compiler::HashedKey hashed_key)
{
std::string file_name;
{
WriteBufferFromString out(file_name);
out << hashed_key.first << '_' << hashed_key.second;
}
return file_name;
}
Compiler::SharedLibraryPtr Compiler::getOrCount(
const std::string & key,
UInt32 min_count_to_compile,
std::function<std::string()> get_code)
{
HashedKey hashed_key = getHash(key);
std::lock_guard<std::mutex> lock(mutex);
UInt32 count = ++counts[hashed_key];
/// Есть готовая открытая библиотека? Или, если библиотека в процессе компиляции, там будет nullptr.
Libraries::iterator it = libraries.find(hashed_key);
if (libraries.end() != it)
{
if (!it->second)
LOG_INFO(log, "Library " << hashedKeyToFileName(hashed_key) << " is compiling.");
return it->second;
}
/// Есть файл с библиотекой, оставшийся от предыдущего запуска?
std::string file_name = hashedKeyToFileName(hashed_key);
if (files.count(file_name))
return libraries.emplace(std::piecewise_construct,
std::forward_as_tuple(hashed_key),
std::forward_as_tuple(new SharedLibrary(path + '/' + file_name + ".so"))).first->second;
/// Достигнуто ли min_count_to_compile?
if (count >= min_count_to_compile)
{
/// TODO Значение min_count_to_compile, равное нулю, обозначает необходимость синхронной компиляции.
/// Есть ли свободные потоки.
if (pool.active() < pool.size())
{
/// Обозначает, что библиотека в процессе компиляции.
libraries[hashed_key] = nullptr;
pool.schedule([=]
{
try
{
compile(hashed_key, file_name, get_code);
}
catch (...)
{
tryLogCurrentException("Compiler");
}
});
}
else
LOG_INFO(log, "All threads are busy.");
}
return nullptr;
}
struct Pipe : private boost::noncopyable
{
FILE * f;
Pipe(const std::string & command)
{
errno = 0;
f = popen(command.c_str(), "r");
if (!f)
throwFromErrno("Cannot popen");
}
~Pipe()
{
try
{
errno = 0;
if (f && -1 == pclose(f))
throwFromErrno("Cannot pclose");
}
catch (...)
{
tryLogCurrentException("Pipe");
}
}
};
void Compiler::compile(HashedKey hashed_key, std::string file_name, std::function<std::string()> get_code)
{
LOG_INFO(log, "Compiling code " << file_name);
std::string prefix = path + "/" + file_name;
std::string cpp_file_path = prefix + ".cpp";
std::string so_file_path = prefix + ".so";
{
WriteBufferFromFile out(cpp_file_path);
out << get_code();
}
std::stringstream command;
/// Слегка неудобно.
command <<
"/usr/share/clickhouse/bin/clang"
" -x c++ -std=gnu++11 -O3 -g -Wall -Werror -Wnon-virtual-dtor -march=native -D NDEBUG"
" -shared -fPIC -fvisibility=hidden -fno-implement-inlines"
" -isystem /usr/share/clickhouse/headers/usr/local/include/"
" -isystem /usr/share/clickhouse/headers/usr/include/"
" -isystem /usr/share/clickhouse/headers/usr/include/c++/4.8/"
" -isystem /usr/share/clickhouse/headers/usr/include/x86_64-linux-gnu/c++/4.8/"
" -isystem /usr/share/clickhouse/headers/usr/local/lib/clang/3.6.0/include/"
" -I /usr/share/clickhouse/headers/dbms/include/"
" -I /usr/share/clickhouse/headers/libs/libcityhash/"
" -I /usr/share/clickhouse/headers/libs/libcommon/include/"
" -I /usr/share/clickhouse/headers/libs/libdouble-conversion/"
" -I /usr/share/clickhouse/headers/libs/libmysqlxx/include/"
" -I /usr/share/clickhouse/headers/libs/libstatdaemons/include/"
" -I /usr/share/clickhouse/headers/libs/libstats/include/"
" -o " << so_file_path << " " << cpp_file_path
<< " 2>&1 || echo Exit code: $?";
std::string compile_result;
{
Pipe pipe(command.str());
int pipe_fd = fileno(pipe.f);
if (-1 == pipe_fd)
throwFromErrno("Cannot fileno");
{
ReadBufferFromFileDescriptor command_output(pipe_fd);
WriteBufferFromString res(compile_result);
copyData(command_output, res);
}
}
if (!compile_result.empty())
throw Exception("Cannot compile code:\n\n" + command.str() + "\n\n" + compile_result);
/// Если до этого была ошибка, то файл с кодом остаётся для возможности просмотра.
Poco::File(cpp_file_path).remove();
{
std::lock_guard<std::mutex> lock(mutex);
libraries[hashed_key].reset(new SharedLibrary(so_file_path));
}
LOG_INFO(log, "Compiled code " << file_name);
}
}

View File

@ -634,4 +634,12 @@ Cluster & Context::getCluster(const std::string & cluster_name)
else
throw Poco::Exception("Failed to find cluster with name = " + cluster_name);
}
Compiler & Context::getCompiler()
{
return shared->compiler;
}
}

View File

@ -759,7 +759,9 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
{
stream = maybeAsynchronous(
new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, overflow_row, final,
settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous);
settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile),
settings.asynchronous);
}
else
{
@ -779,7 +781,9 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
}
else
stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, key_names, aggregates, overflow_row, final,
settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous);
settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile),
settings.asynchronous);
}

View File

@ -73,7 +73,7 @@ int main(int argc, char ** argv)
DB::DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
DB::Aggregator aggregator(key_column_numbers, aggregate_descriptions, false);
DB::Aggregator aggregator(key_column_numbers, aggregate_descriptions, false, 0, DB::OverflowMode::THROW, nullptr, 0);
{
Poco::Stopwatch stopwatch;