diff --git a/dbms/include/DB/Interpreters/Compiler.h b/dbms/include/DB/Interpreters/Compiler.h new file mode 100644 index 00000000000..f89a6f28543 --- /dev/null +++ b/dbms/include/DB/Interpreters/Compiler.h @@ -0,0 +1,324 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +/** Позволяет открыть динамическую библиотеку и получить из неё указатель на функцию. + */ +class SharedLibrary : private boost::noncopyable +{ +public: + SharedLibrary(const std::string & path) + { + handle = dlopen(path.c_str(), RTLD_LAZY); + if (!handle) + throw Exception(std::string("Cannot dlopen: ") + dlerror()); + } + + ~SharedLibrary() + { + if (handle && dlclose(handle)) + throw Exception("Cannot dlclose"); + } + + template + Func get(const std::string & name) + { + dlerror(); + + Func res = reinterpret_cast(dlsym(handle, name.c_str())); + + if (char * error = dlerror()) + throw Exception(std::string("Cannot dlsym: ") + error); + + return res; + } + +private: + void * handle; +}; + + +/** Позволяет скомпилировать кусок кода, использующий заголовочные файлы сервера, в динамическую библиотеку. + * Ведёт статистику вызовов, и инициирует компиляцию только на N-ый по счёту вызов для одного ключа. + * Компиляция выполняется асинхронно, в отдельных потоках, если есть свободные потоки. + * NOTE: Нет очистки устаревших и ненужных результатов. + */ +class Compiler +{ +public: + /** path - путь к директории с временными файлами - результатами компиляции. + * Результаты компиляции сохраняются при перезапуске сервера, + * но используют в качестве части ключа номер ревизии. То есть, устаревают при обновлении сервера. + */ + Compiler(const std::string & path_, size_t threads) + : path(path_), pool(threads) + { + Poco::DirectoryIterator dir_end; + for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it) + { + std::string name = dir_it.name(); + if (name.length() > strlen(".so") && 0 == name.compare(name.size() - 3, 3, ".so")) + { + files.insert(name.substr(0, name.size() - 3)); + } + } + + LOG_INFO(log, "Having " << files.size() << " compiled files from previous start."); + } + + ~Compiler() + { + LOG_DEBUG(log, "Waiting for threads to finish."); + pool.wait(); + } + + /** Увеличить счётчик для заданного ключа key на единицу. + * Если результат компиляции уже есть (уже открыт, или есть файл с библиотекой), + * то вернуть готовую SharedLibrary. + * Иначе, если счётчик достиг min_count_to_compile, + * инициировать компиляцию в отдельном потоке, если есть свободные потоки, и вернуть nullptr. + * Иначе вернуть nullptr. + */ + std::shared_ptr getOrCount( + const std::string & key, + UInt32 min_count_to_compile, + std::function get_code) + { + HashedKey hashed_key = getHash(key); + + std::lock_guard lock(mutex); + + UInt32 count = ++counts[hashed_key]; + + /// Есть готовая открытая библиотека? Или, если библиотека в процессе компиляции, там будет nullptr. + Libraries::iterator it = libraries.find(hashed_key); + if (libraries.end() != it) + { + if (!it->second) + LOG_INFO(log, "Library " << hashedKeyToFileName(hashed_key) << " is compiling."); + + return it->second; + } + + /// Есть файл с библиотекой, оставшийся от предыдущего запуска? + std::string file_name = hashedKeyToFileName(hashed_key); + if (files.count(file_name)) + return libraries.emplace(std::piecewise_construct, + std::forward_as_tuple(hashed_key), + std::forward_as_tuple(new SharedLibrary(path + '/' + file_name + ".so"))).first->second; + + /// Достигнуто ли min_count_to_compile? + if (count >= min_count_to_compile) + { + /// Есть ли свободные потоки. + if (pool.active() < pool.size()) + { + /// Обозначает, что библиотека в процессе компиляции. + libraries[hashed_key] = nullptr; + + pool.schedule([=] + { + try + { + compile(hashed_key, file_name, get_code); + } + catch (...) + { + tryLogCurrentException("Compiler"); + } + }); + } + else + LOG_INFO(log, "All threads are busy."); + } + + return nullptr; + } + + +private: + using HashedKey = UInt128; + using Counts = std::unordered_map; + using Libraries = std::unordered_map, UInt128Hash>; + using Files = std::unordered_set; + + const std::string path; + boost::threadpool::pool pool; + + /// Количество вызовов функции getOrCount. + Counts counts; + + /// Скомпилированные и открытые библиотеки. Или nullptr для библиотек в процессе компиляции. + Libraries libraries; + + /// Скомпилированные файлы, оставшиеся от предыдущих запусков, но ещё не открытые. + Files files; + + std::mutex mutex; + + Logger * log = &Logger::get("Compiler"); + + + static HashedKey getHash(const std::string & key) + { + SipHash hash; + + auto revision = Revision::get(); + hash.update(reinterpret_cast(&revision), sizeof(revision)); + hash.update(key.data(), key.size()); + + HashedKey res; + hash.get128(res.first, res.second); + return res; + } + + /// Без расширения .so. + static std::string hashedKeyToFileName(HashedKey hashed_key) + { + std::string file_name; + + { + WriteBufferFromString out(file_name); + out << hashed_key.first << '_' << hashed_key.second; + } + + return file_name; + } + + static HashedKey fileNameToHashedKey(const std::string & file_name) + { + HashedKey hashed_key; + + { + ReadBufferFromString in(file_name); + in >> hashed_key.first >> "_" >> hashed_key.second; + } + + return hashed_key; + } + + struct Pipe : private boost::noncopyable + { + FILE * f; + + Pipe(const std::string & command) + { + errno = 0; + f = popen(command.c_str(), "r"); + + if (!f) + throwFromErrno("Cannot popen"); + } + + ~Pipe() + { + try + { + errno = 0; + if (f && -1 == pclose(f)) + throwFromErrno("Cannot pclose"); + } + catch (...) + { + tryLogCurrentException("Pipe"); + } + } + }; + + void compile(HashedKey hashed_key, std::string file_name, std::function get_code) + { + LOG_INFO(log, "Compiling code " << file_name); + + std::string prefix = path + "/" + file_name; + std::string cpp_file_path = prefix + ".cpp"; + std::string so_file_path = prefix + ".so"; + + { + WriteBufferFromFile out(cpp_file_path); + out << get_code(); + } + + std::stringstream command; + + /// Слегка неудобно. + command << + "/usr/share/clickhouse/bin/clang" + " -x c++ -std=gnu++11 -O3 -g -Wall -Werror -Wnon-virtual-dtor -march=native -D NDEBUG" + " -shared -fPIC -fvisibility=hidden -fno-implement-inlines" + " -isystem /usr/share/clickhouse/headers/usr/local/include/" + " -isystem /usr/share/clickhouse/headers/usr/include/" + " -isystem /usr/share/clickhouse/headers/usr/include/c++/4.8/" + " -isystem /usr/share/clickhouse/headers/usr/include/x86_64-linux-gnu/c++/4.8/" + " -isystem /usr/share/clickhouse/headers/usr/lib/gcc/x86_64-linux-gnu/4.8/include/" + " -I /usr/share/clickhouse/headers/dbms/include/" + " -I /usr/share/clickhouse/headers/libs/libcityhash/" + " -I /usr/share/clickhouse/headers/libs/libcommon/include/" + " -I /usr/share/clickhouse/headers/libs/libdouble-conversion/src/" + " -I /usr/share/clickhouse/headers/libs/libmysqlxx/include/" + " -I /usr/share/clickhouse/headers/libs/libstatdaemons/include/" + " -I /usr/share/clickhouse/headers/libs/libstats/include/" + " -o " << so_file_path << " " << cpp_file_path + << " 2>&1 || echo Exit code: $?"; + + std::string compile_result; + + { + Pipe pipe(command.str()); + + int pipe_fd = fileno(pipe.f); + if (-1 == pipe_fd) + throwFromErrno("Cannot fileno"); + + { + ReadBufferFromFileDescriptor command_output(pipe_fd); + WriteBufferFromString res(compile_result); + + copyData(command_output, res); + } + } + + if (!compile_result.empty()) + throw Exception("Cannot compile code:\n\n" + command.str() + "\n\n" + compile_result); + + /// Если до этого была ошибка, то файл с кодом остаётся для возможности просмотра. + Poco::File(cpp_file_path).remove(); + + { + std::lock_guard lock(mutex); + libraries[hashed_key].reset(new SharedLibrary(so_file_path)); + } + + LOG_INFO(log, "Compiled code " << file_name); + } +}; + +} diff --git a/dbms/src/Interpreters/tests/compiler_test.cpp b/dbms/src/Interpreters/tests/compiler_test.cpp new file mode 100644 index 00000000000..48d6a90216d --- /dev/null +++ b/dbms/src/Interpreters/tests/compiler_test.cpp @@ -0,0 +1,30 @@ +#include +#include + +#include + + +int main(int argc, char ** argv) +{ + using namespace DB; + + Poco::AutoPtr channel = new Poco::ConsoleChannel(std::cerr); + Logger::root().setChannel(channel); + Logger::root().setLevel("trace"); + + try + { + Compiler compiler(".", 1); + + auto lib = compiler.getOrCount("xxx", 1, []() -> std::string + { + return "void f() __attribute__((__visibility__(\"default\"))); void f() {}"; + }); + } + catch (const DB::Exception & e) + { + std::cerr << e.displayText() << std::endl; + } + + return 0; +}