Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Ivan Blinkov 2019-01-28 17:34:04 +03:00
commit f74b15e777
83 changed files with 758 additions and 459 deletions

2
contrib/jemalloc vendored

@ -1 +1 @@
Subproject commit 41b7372eadee941b9164751b8d4963f915d3ceae
Subproject commit cd2931ad9bbd78208565716ab102e86d858c2fff

View File

@ -11,7 +11,7 @@
#include <Poco/File.h>
#include <Poco/Util/Application.h>
#include <Common/Stopwatch.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <AggregateFunctions/ReservoirSampler.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <boost/program_options.hpp>

View File

@ -18,7 +18,7 @@
#include <pcg_random.hpp>
#include <common/logger_useful.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <daemon/OwnPatternFormatter.h>
#include <Common/Exception.h>

View File

@ -17,6 +17,7 @@
#include <Common/Config/ConfigProcessor.h>
#include <Common/escapeForFileName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
@ -102,7 +103,7 @@ int LocalServer::main(const std::vector<std::string> & /*args*/)
try
{
Logger * log = &logger();
ThreadStatus thread_status;
UseSSL use_ssl;
if (!config().has("query") && !config().has("table-structure")) /// Nothing to process

View File

@ -23,7 +23,7 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/UseSSL.h>
#include <Interpreters/Settings.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <common/getMemoryAmount.h>
#include <Poco/AutoPtr.h>
#include <Poco/Exception.h>

View File

@ -647,6 +647,7 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_
void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
setThreadName("HTTPHandler");
ThreadStatus thread_status;
Output used_output;

View File

@ -6,6 +6,7 @@
#include <thread>
#include <vector>
#include <Common/ProfileEvents.h>
#include <Common/ThreadPool.h>
namespace DB
@ -46,7 +47,7 @@ private:
bool quit = false;
std::mutex mutex;
std::condition_variable cond;
std::thread thread{&MetricsTransmitter::run, this};
ThreadFromGlobalPool thread{&MetricsTransmitter::run, this};
static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics.";

View File

@ -27,6 +27,7 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Common/ThreadStatus.h>
#include <IO/HTTPCommon.h>
#include <IO/UseSSL.h>
#include <Interpreters/AsynchronousMetrics.h>
@ -129,9 +130,10 @@ std::string Server::getDefaultCorePath() const
int Server::main(const std::vector<std::string> & /*args*/)
{
Logger * log = &logger();
UseSSL use_ssl;
ThreadStatus thread_status;
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();

View File

@ -55,6 +55,7 @@ namespace ErrorCodes
void TCPHandler::runImpl()
{
setThreadName("TCPHandler");
ThreadStatus thread_status;
connection_context = server.context();
connection_context.setSessionContext(connection_context);

View File

@ -33,7 +33,7 @@ ConfigReloader::ConfigReloader(
void ConfigReloader::start()
{
thread = std::thread(&ConfigReloader::run, this);
thread = ThreadFromGlobalPool(&ConfigReloader::run, this);
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "ConfigProcessor.h"
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <time.h>
@ -81,7 +82,7 @@ private:
Updater updater;
std::atomic<bool> quit{false};
std::thread thread;
ThreadFromGlobalPool thread;
/// Locked inside reloadIfNewer.
std::mutex reload_mutex;

View File

@ -2,6 +2,7 @@
#include "CurrentThread.h"
#include <common/logger_useful.h>
#include <common/likely.h>
#include <Common/ThreadStatus.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Interpreters/ProcessList.h>
@ -10,11 +11,6 @@
#include <Poco/Logger.h>
#if defined(ARCADIA_ROOT)
# include <util/thread/singleton.h>
#endif
namespace DB
{
@ -23,91 +19,62 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
// Smoker's implementation to avoid thread_local usage: error: undefined symbol: __cxa_thread_atexit
#if defined(ARCADIA_ROOT)
struct ThreadStatusPtrHolder : ThreadStatusPtr
{
ThreadStatusPtrHolder() { ThreadStatusPtr::operator=(ThreadStatus::create()); }
};
struct ThreadScopePtrHolder : CurrentThread::ThreadScopePtr
{
ThreadScopePtrHolder() { CurrentThread::ThreadScopePtr::operator=(std::make_shared<CurrentThread::ThreadScope>()); }
};
# define current_thread (*FastTlsSingleton<ThreadStatusPtrHolder>())
# define current_thread_scope (*FastTlsSingleton<ThreadScopePtrHolder>())
#else
/// Order of current_thread and current_thread_scope matters
thread_local ThreadStatusPtr _current_thread = ThreadStatus::create();
thread_local CurrentThread::ThreadScopePtr _current_thread_scope = std::make_shared<CurrentThread::ThreadScope>();
# define current_thread _current_thread
# define current_thread_scope _current_thread_scope
#endif
void CurrentThread::updatePerformanceCounters()
{
get()->updatePerformanceCounters();
get().updatePerformanceCounters();
}
ThreadStatusPtr CurrentThread::get()
ThreadStatus & CurrentThread::get()
{
#ifndef NDEBUG
if (!current_thread || current_thread.use_count() <= 0)
if (unlikely(!current_thread))
throw Exception("Thread #" + std::to_string(Poco::ThreadNumber::get()) + " status was not initialized", ErrorCodes::LOGICAL_ERROR);
if (Poco::ThreadNumber::get() != current_thread->thread_number)
throw Exception("Current thread has different thread number", ErrorCodes::LOGICAL_ERROR);
#endif
return current_thread;
}
CurrentThread::ThreadScopePtr CurrentThread::getScope()
{
return current_thread_scope;
return *current_thread;
}
ProfileEvents::Counters & CurrentThread::getProfileEvents()
{
return current_thread->performance_counters;
return current_thread ? get().performance_counters : ProfileEvents::global_counters;
}
MemoryTracker & CurrentThread::getMemoryTracker()
{
return current_thread->memory_tracker;
return get().memory_tracker;
}
void CurrentThread::updateProgressIn(const Progress & value)
{
current_thread->progress_in.incrementPiecewiseAtomically(value);
get().progress_in.incrementPiecewiseAtomically(value);
}
void CurrentThread::updateProgressOut(const Progress & value)
{
current_thread->progress_out.incrementPiecewiseAtomically(value);
get().progress_out.incrementPiecewiseAtomically(value);
}
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue)
{
get()->attachInternalTextLogsQueue(logs_queue);
get().attachInternalTextLogsQueue(logs_queue);
}
std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
{
/// NOTE: this method could be called at early server startup stage
/// NOTE: this method could be called in ThreadStatus destructor, therefore we make use_count() check just in case
if (!current_thread || current_thread.use_count() <= 0)
if (!current_thread)
return nullptr;
if (current_thread->getCurrentState() == ThreadStatus::ThreadState::Died)
if (get().getCurrentState() == ThreadStatus::ThreadState::Died)
return nullptr;
return current_thread->getInternalTextLogsQueue();
return get().getInternalTextLogsQueue();
}
ThreadGroupStatusPtr CurrentThread::getGroup()
{
return get()->getThreadGroup();
if (!current_thread)
return nullptr;
return get().getThreadGroup();
}
}

View File

@ -32,7 +32,7 @@ class CurrentThread
{
public:
/// Handler to current thread
static ThreadStatusPtr get();
static ThreadStatus & get();
/// Group to which belongs current thread
static ThreadGroupStatusPtr getGroup();
@ -85,25 +85,6 @@ public:
bool log_peak_memory_usage_in_destructor = true;
};
/// Implicitly finalizes current thread in the destructor
class ThreadScope
{
public:
void (*deleter)() = nullptr;
ThreadScope() = default;
~ThreadScope()
{
if (deleter)
deleter();
/// std::terminate on exception: this is Ok.
}
};
using ThreadScopePtr = std::shared_ptr<ThreadScope>;
static ThreadScopePtr getScope();
private:
static void defaultThreadDeleter();
};

View File

@ -413,6 +413,7 @@ namespace ErrorCodes
extern const int CANNOT_CONVERT_TO_PROTOBUF_TYPE = 436;
extern const int PROTOBUF_FIELD_NOT_REPEATED = 437;
extern const int DATA_TYPE_CANNOT_BE_PROMOTED = 438;
extern const int CANNOT_SCHEDULE_TASK = 439;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -190,16 +190,19 @@ namespace CurrentMemoryTracker
{
void alloc(Int64 size)
{
if (DB::current_thread)
DB::CurrentThread::getMemoryTracker().alloc(size);
}
void realloc(Int64 old_size, Int64 new_size)
{
if (DB::current_thread)
DB::CurrentThread::getMemoryTracker().alloc(new_size - old_size);
}
void free(Int64 size)
{
if (DB::current_thread)
DB::CurrentThread::getMemoryTracker().free(size);
}
}

View File

@ -0,0 +1,235 @@
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <iostream>
#include <type_traits>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SCHEDULE_TASK;
}
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads)
: ThreadPoolImpl(max_threads, max_threads, max_threads)
{
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size)
: max_threads(max_threads), max_free_threads(max_free_threads), queue_size(queue_size)
{
}
template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
{
auto on_error = []
{
if constexpr (std::is_same_v<ReturnType, void>)
throw DB::Exception("Cannot schedule a task", DB::ErrorCodes::CANNOT_SCHEDULE_TASK);
else
return false;
};
{
std::unique_lock lock(mutex);
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
if (wait_microseconds)
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error();
}
else
job_finished.wait(lock, pred);
if (shutdown)
return on_error();
jobs.emplace(std::move(job), priority);
++scheduled_jobs;
if (threads.size() < std::min(max_threads, scheduled_jobs))
{
threads.emplace_front();
try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
}
catch (...)
{
threads.pop_front();
}
}
}
new_job_or_shutdown.notify_one();
return ReturnType(true);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds)
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds)
{
scheduleImpl<void>(std::move(job), priority, wait_microseconds);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
{
std::unique_lock lock(mutex);
job_finished.wait(lock, [this] { return scheduled_jobs == 0; });
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
}
}
template <typename Thread>
ThreadPoolImpl<Thread>::~ThreadPoolImpl()
{
finalize();
}
template <typename Thread>
void ThreadPoolImpl<Thread>::finalize()
{
{
std::unique_lock lock(mutex);
shutdown = true;
}
new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
threads.clear();
}
template <typename Thread>
size_t ThreadPoolImpl<Thread>::active() const
{
std::unique_lock lock(mutex);
return scheduled_jobs;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
while (true)
{
Job job;
bool need_shutdown = false;
{
std::unique_lock lock(mutex);
new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
need_shutdown = shutdown;
if (!jobs.empty())
{
job = jobs.top().job;
jobs.pop();
}
else
{
return;
}
}
if (!need_shutdown)
{
try
{
job();
}
catch (...)
{
{
std::unique_lock lock(mutex);
if (!first_exception)
first_exception = std::current_exception();
shutdown = true;
--scheduled_jobs;
}
job_finished.notify_all();
new_job_or_shutdown.notify_all();
return;
}
}
{
std::unique_lock lock(mutex);
--scheduled_jobs;
if (threads.size() > scheduled_jobs + max_free_threads)
{
threads.erase(thread_it);
job_finished.notify_all();
return;
}
}
job_finished.notify_all();
}
}
template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<ThreadFromGlobalPool>;
void ExceptionHandler::setException(std::exception_ptr && exception)
{
std::unique_lock lock(mutex);
if (!first_exception)
first_exception = std::move(exception);
}
void ExceptionHandler::throwIfException()
{
std::unique_lock lock(mutex);
if (first_exception)
std::rethrow_exception(first_exception);
}
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler)
{
return [job{std::move(job)}, &handler] ()
{
try
{
job();
}
catch (...)
{
handler.setException(std::current_exception());
}
};
}

View File

@ -0,0 +1,203 @@
#pragma once
#include <cstdint>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <list>
#include <optional>
#include <ext/singleton.h>
#include <Common/ThreadStatus.h>
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
*
* This thread pool can be used as a task queue.
* For example, you can create a thread pool with 10 threads (and queue of size 10) and schedule 1000 tasks
* - in this case you will be blocked to keep 10 tasks in fly.
*
* Thread: std::thread or something with identical interface.
*/
template <typename Thread>
class ThreadPoolImpl
{
public:
using Job = std::function<void()>;
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
explicit ThreadPoolImpl(size_t max_threads);
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
/// Priority: greater is higher.
void schedule(Job job, int priority = 0);
/// Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait();
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
/// You should not destroy object while calling schedule or wait methods from another threads.
~ThreadPoolImpl();
/// Returns number of running and scheduled jobs.
size_t active() const;
private:
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
const size_t max_threads;
const size_t max_free_threads;
const size_t queue_size;
size_t scheduled_jobs = 0;
bool shutdown = false;
struct JobWithPriority
{
Job job;
int priority;
JobWithPriority(Job job, int priority)
: job(job), priority(priority) {}
bool operator< (const JobWithPriority & rhs) const
{
return priority < rhs.priority;
}
};
std::priority_queue<JobWithPriority> jobs;
std::list<Thread> threads;
std::exception_ptr first_exception;
template <typename ReturnType>
ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds);
void worker(typename std::list<Thread>::iterator thread_it);
void finalize();
};
/// ThreadPool with std::thread for threads.
using FreeThreadPool = ThreadPoolImpl<std::thread>;
/** Global ThreadPool that can be used as a singleton.
* Why it is needed?
*
* Linux can create and destroy about 100 000 threads per second (quite good).
* With simple ThreadPool (based on mutex and condvar) you can assign about 200 000 tasks per second
* - not much difference comparing to not using a thread pool at all.
*
* But if you reuse OS threads instead of creating and destroying them, several benefits exist:
* - allocator performance will usually be better due to reuse of thread local caches, especially for jemalloc:
* https://github.com/jemalloc/jemalloc/issues/1347
* - address sanitizer and thread sanitizer will not fail due to global limit on number of created threads.
* - program will work faster in gdb;
*/
class GlobalThreadPool : public FreeThreadPool, public ext::singleton<GlobalThreadPool>
{
public:
GlobalThreadPool() : FreeThreadPool(10000, 1000, 10000) {}
};
/** Looks like std::thread but allocates threads in GlobalThreadPool.
* Also holds ThreadStatus for ClickHouse.
*/
class ThreadFromGlobalPool
{
public:
ThreadFromGlobalPool() {}
template <typename Function, typename... Args>
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
{
mutex = std::make_unique<std::mutex>();
/// The function object must be copyable, so we wrap lock_guard in shared_ptr.
GlobalThreadPool::instance().scheduleOrThrow([
lock = std::make_shared<std::lock_guard<std::mutex>>(*mutex),
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]
{
DB::ThreadStatus thread_status;
std::apply(func, args);
});
}
ThreadFromGlobalPool(ThreadFromGlobalPool && rhs)
{
*this = std::move(rhs);
}
ThreadFromGlobalPool & operator=(ThreadFromGlobalPool && rhs)
{
if (mutex)
std::terminate();
mutex = std::move(rhs.mutex);
return *this;
}
~ThreadFromGlobalPool()
{
if (mutex)
std::terminate();
}
void join()
{
{
std::lock_guard lock(*mutex);
}
mutex.reset();
}
bool joinable() const
{
return static_cast<bool>(mutex);
}
private:
std::unique_ptr<std::mutex> mutex; /// Object must be moveable.
};
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
/// Allows to save first catched exception in jobs and postpone its rethrow.
class ExceptionHandler
{
public:
void setException(std::exception_ptr && exception);
void throwIfException();
private:
std::exception_ptr first_exception;
std::mutex mutex;
};
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler);

View File

@ -21,10 +21,13 @@ namespace ErrorCodes
}
thread_local ThreadStatusPtr current_thread = nullptr;
TasksStatsCounters TasksStatsCounters::current()
{
TasksStatsCounters res;
CurrentThread::get()->taskstats_getter->getStat(res.stat, CurrentThread::get()->os_thread_id);
CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().os_thread_id);
return res;
}
@ -39,17 +42,19 @@ ThreadStatus::ThreadStatus()
memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus");
current_thread = this;
/// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created
/// Otherwise it could lead to SIGSEGV due to current_thread dereferencing
}
ThreadStatusPtr ThreadStatus::create()
ThreadStatus::~ThreadStatus()
{
return ThreadStatusPtr(new ThreadStatus);
if (deleter)
deleter();
current_thread = nullptr;
}
ThreadStatus::~ThreadStatus() = default;
void ThreadStatus::initPerformanceCounters()
{
performance_counters_finalized = false;

View File

@ -9,6 +9,8 @@
#include <map>
#include <mutex>
#include <shared_mutex>
#include <functional>
#include <boost/noncopyable.hpp>
namespace Poco
@ -23,7 +25,7 @@ namespace DB
class Context;
class QueryStatus;
class ThreadStatus;
using ThreadStatusPtr = std::shared_ptr<ThreadStatus>;
using ThreadStatusPtr = ThreadStatus*;
class QueryThreadLog;
struct TasksStatsCounters;
struct RUsageCounters;
@ -67,14 +69,20 @@ public:
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
extern thread_local ThreadStatusPtr current_thread;
/** Encapsulates all per-thread info (ProfileEvents, MemoryTracker, query_id, query context, etc.).
* Used inside thread-local variable. See variables in CurrentThread.cpp
* The object must be created in thread function and destroyed in the same thread before the exit.
* It is accessed through thread-local pointer.
*
* This object should be used only via "CurrentThread", see CurrentThread.h
*/
class ThreadStatus : public std::enable_shared_from_this<ThreadStatus>
class ThreadStatus : public boost::noncopyable
{
public:
ThreadStatus();
~ThreadStatus();
/// Poco's thread number (the same number is used in logs)
UInt32 thread_number = 0;
/// Linux's PID (or TGID) (the same id is shown by ps util)
@ -88,8 +96,8 @@ public:
Progress progress_in;
Progress progress_out;
public:
static ThreadStatusPtr create();
using Deleter = std::function<void()>;
Deleter deleter;
ThreadGroupStatusPtr getThreadGroup() const
{
@ -136,11 +144,7 @@ public:
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
~ThreadStatus();
protected:
ThreadStatus();
void initPerformanceCounters();
void logToQueryThreadLog(QueryThreadLog & thread_log);

View File

@ -853,8 +853,8 @@ ZooKeeper::ZooKeeper(
if (!auth_scheme.empty())
sendAuth(auth_scheme, auth_data);
send_thread = std::thread([this] { sendThread(); });
receive_thread = std::thread([this] { receiveThread(); });
send_thread = ThreadFromGlobalPool([this] { sendThread(); });
receive_thread = ThreadFromGlobalPool([this] { receiveThread(); });
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
}

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <IO/ReadBuffer.h>
@ -209,8 +210,8 @@ private:
Watches watches;
std::mutex watches_mutex;
std::thread send_thread;
std::thread receive_thread;
ThreadFromGlobalPool send_thread;
ThreadFromGlobalPool receive_thread;
void connect(
const Addresses & addresses,

View File

@ -53,6 +53,13 @@ target_link_libraries (thread_creation_latency PRIVATE clickhouse_common_io)
add_executable (thread_pool thread_pool.cpp)
target_link_libraries (thread_pool PRIVATE clickhouse_common_io)
add_executable (thread_pool_2 thread_pool_2.cpp)
target_link_libraries (thread_pool_2 PRIVATE clickhouse_common_io)
add_executable (multi_version multi_version.cpp)
target_link_libraries (multi_version PRIVATE clickhouse_common_io)
add_check(multi_version)
add_executable (array_cache array_cache.cpp)
target_link_libraries (array_cache PRIVATE clickhouse_common_io)

View File

@ -8,7 +8,7 @@
#include <Common/RWLock.h>
#include <Common/Stopwatch.h>
#include <common/Types.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <random>
#include <pcg_random.hpp>
#include <thread>

View File

@ -1,8 +1,8 @@
#include <string.h>
#include <iostream>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <functional>
#include <common/MultiVersion.h>
#include <Common/MultiVersion.h>
#include <Poco/Exception.h>
@ -23,7 +23,7 @@ void thread2(MV & x, const char * result)
}
int main(int argc, char ** argv)
int main(int, char **)
{
try
{

View File

@ -16,7 +16,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <Common/Stopwatch.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
using Key = UInt64;

View File

@ -16,7 +16,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <Common/Stopwatch.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
using Key = UInt64;

View File

@ -5,7 +5,7 @@
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
int x = 0;

View File

@ -1,4 +1,4 @@
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
/** Reproduces bug in ThreadPool.
* It get stuck if we call 'wait' many times from many other threads simultaneously.

View File

@ -0,0 +1,21 @@
#include <atomic>
#include <iostream>
#include <Common/ThreadPool.h>
int main(int, char **)
{
std::atomic<size_t> res{0};
for (size_t i = 0; i < 1000; ++i)
{
size_t threads = 16;
ThreadPool pool(threads);
for (size_t j = 0; j < threads; ++j)
pool.schedule([&]{ ++res; });
pool.wait();
}
std::cerr << res << "\n";
return 0;
}

View File

@ -161,9 +161,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
thread = ThreadFromGlobalPool([this] { threadFunction(); });
delayed_thread = std::thread([this] { delayExecutionThreadFunction(); });
delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); });
}
@ -181,7 +181,7 @@ BackgroundSchedulePool::~BackgroundSchedulePool()
delayed_thread.join();
LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish.");
for (std::thread & thread : threads)
for (auto & thread : threads)
thread.join();
}
catch (...)

View File

@ -13,6 +13,8 @@
#include <boost/noncopyable.hpp>
#include <Common/ZooKeeper/Types.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
namespace DB
{
@ -119,7 +121,7 @@ public:
~BackgroundSchedulePool();
private:
using Threads = std::vector<std::thread>;
using Threads = std::vector<ThreadFromGlobalPool>;
void threadFunction();
void delayExecutionThreadFunction();
@ -141,7 +143,7 @@ private:
std::condition_variable wakeup_cond;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
std::thread delayed_thread;
ThreadFromGlobalPool delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;

View File

@ -35,7 +35,7 @@ void AsynchronousBlockInputStream::next()
{
ready.reset();
pool.schedule([this, thread_group=CurrentThread::getGroup()] ()
pool.schedule([this, thread_group = CurrentThread::getGroup()] ()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};

View File

@ -5,7 +5,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Common/MemoryTracker.h>
#include <Poco/Ext/ThreadNumber.h>

View File

@ -195,7 +195,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
*/
for (size_t i = 0; i < merging_threads; ++i)
pool.schedule([this, thread_group=CurrentThread::getGroup()] () { mergeThread(thread_group); });
pool.schedule([this, thread_group = CurrentThread::getGroup()] () { mergeThread(thread_group); });
}
}

View File

@ -4,7 +4,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentThread.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <condition_variable>

View File

@ -13,6 +13,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
/** Allows to process multiple block input streams (sources) in parallel, using specified number of threads.
@ -303,8 +304,8 @@ private:
Handler & handler;
/// Streams.
using ThreadsData = std::vector<std::thread>;
/// Threads.
using ThreadsData = std::vector<ThreadFromGlobalPool>;
ThreadsData threads;
/** A set of available sources that are not currently processed by any thread.

View File

@ -5,7 +5,7 @@
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
namespace DB

View File

@ -1,6 +1,6 @@
#pragma once
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Databases/IDatabase.h>

View File

@ -11,7 +11,7 @@
#include <Common/escapeForFileName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Stopwatch.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>

View File

@ -8,12 +8,10 @@
#include <functional>
#include <Poco/File.h>
#include <Common/escapeForFileName.h>
#include <Common/ThreadPool.h>
#include <Interpreters/Context.h>
class ThreadPool;
namespace DB
{

View File

@ -6,6 +6,7 @@
#include <DataStreams/OwningBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
#include "DictionarySourceFactory.h"
#include "DictionarySourceHelpers.h"
@ -165,7 +166,7 @@ namespace
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;
std::packaged_task<void()> task;
std::thread thread;
ThreadFromGlobalPool thread;
bool wait_called = false;
};

View File

@ -8,6 +8,7 @@
#include <mutex>
#include <map>
#include <IO/AIO.h>
#include <Common/ThreadPool.h>
namespace DB
@ -32,7 +33,7 @@ class AIOContextPool : public ext::singleton<AIOContextPool>
std::map<ID, std::promise<BytesRead>> promises;
std::atomic<bool> cancelled{false};
std::thread io_completion_monitor{&AIOContextPool::doMonitor, this};
ThreadFromGlobalPool io_completion_monitor{&AIOContextPool::doMonitor, this};
~AIOContextPool();

View File

@ -4,7 +4,7 @@
#include <vector>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <IO/WriteBuffer.h>

View File

@ -12,7 +12,7 @@
#include <Common/Arena.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/TwoLevelHashMap.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Common/UInt128.h>
#include <Common/LRUCache.h>

View File

@ -5,6 +5,7 @@
#include <condition_variable>
#include <unordered_map>
#include <string>
#include <Common/ThreadPool.h>
namespace DB
@ -43,7 +44,7 @@ private:
Container container;
mutable std::mutex container_mutex;
std::thread thread;
ThreadFromGlobalPool thread;
void run();
void update();

View File

@ -142,9 +142,6 @@ SharedLibraryPtr Compiler::getOrCount(
{
/// The min_count_to_compile value of zero indicates the need for synchronous compilation.
/// Are there any free threads?
if (min_count_to_compile == 0 || pool.active() < pool.size())
{
/// Indicates that the library is in the process of compiling.
libraries[hashed_key] = nullptr;
@ -161,7 +158,7 @@ SharedLibraryPtr Compiler::getOrCount(
}
else
{
pool.schedule([=]
bool res = pool.trySchedule([=]
{
try
{
@ -172,11 +169,11 @@ SharedLibraryPtr Compiler::getOrCount(
tryLogCurrentException("Compiler");
}
});
}
}
else
if (!res)
LOG_INFO(log, "All threads are busy.");
}
}
return nullptr;
}

View File

@ -15,7 +15,7 @@
#include <Common/Exception.h>
#include <Common/UInt128.h>
#include <Common/SharedLibrary.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
namespace DB
{

View File

@ -10,8 +10,9 @@
#include <optional>
#include <Common/config.h>
#include <common/MultiVersion.h>
#include <Common/MultiVersion.h>
#include <Common/LRUCache.h>
#include <Common/ThreadPool.h>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Core/Block.h>
@ -521,7 +522,7 @@ private:
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> quit{false};
std::thread thread{&SessionCleaner::run, this};
ThreadFromGlobalPool thread{&SessionCleaner::run, this};
};
}

View File

@ -241,7 +241,7 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const
event_queue_updated = std::make_shared<Poco::Event>();
thread = std::thread(&DDLWorker::run, this);
thread = ThreadFromGlobalPool(&DDLWorker::run, this);
}

View File

@ -3,6 +3,7 @@
#include <Interpreters/Cluster.h>
#include <DataStreams/BlockIO.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
#include <atomic>
@ -90,7 +91,7 @@ private:
std::shared_ptr<Poco::Event> event_queue_updated;
std::atomic<bool> stop_flag{false};
std::thread thread;
ThreadFromGlobalPool thread;
Int64 last_cleanup_time_seconds = 0;

View File

@ -150,7 +150,7 @@ EmbeddedDictionaries::EmbeddedDictionaries(
, reload_period(context_.getConfigRef().getInt("builtin_dictionaries_reload_interval", 3600))
{
reloadImpl(throw_on_error);
reloading_thread = std::thread([this] { reloadPeriodically(); });
reloading_thread = ThreadFromGlobalPool([this] { reloadPeriodically(); });
}

View File

@ -2,7 +2,8 @@
#include <thread>
#include <functional>
#include <common/MultiVersion.h>
#include <Common/MultiVersion.h>
#include <Common/ThreadPool.h>
#include <Poco/Event.h>
@ -41,7 +42,7 @@ private:
mutable std::mutex mutex;
std::thread reloading_thread;
ThreadFromGlobalPool reloading_thread;
Poco::Event destroy;

View File

@ -72,7 +72,7 @@ void ExternalLoader::init(bool throw_on_error)
reloadAndUpdate(throw_on_error);
}
reloading_thread = std::thread{&ExternalLoader::reloadPeriodically, this};
reloading_thread = ThreadFromGlobalPool{&ExternalLoader::reloadPeriodically, this};
}

View File

@ -13,6 +13,7 @@
#include <Core/Types.h>
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Common/ThreadPool.h>
namespace DB
@ -160,7 +161,7 @@ private:
std::unique_ptr<IExternalLoaderConfigRepository> config_repository;
std::thread reloading_thread;
ThreadFromGlobalPool reloading_thread;
Poco::Event destroy;
Logger * log;

View File

@ -2,12 +2,12 @@
#include <Interpreters/IInterpreter.h>
#include <Storages/ColumnsDescription.h>
#include <Common/ThreadPool.h>
class ThreadPool;
namespace DB
{
class Context;
class ASTCreateQuery;
class ASTExpressionList;

View File

@ -4,7 +4,7 @@
#include <Common/config.h>
#include <Common/typeid_cast.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Interpreters/EmbeddedDictionaries.h>

View File

@ -18,6 +18,7 @@
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -135,7 +136,7 @@ protected:
/** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table.
*/
std::thread saving_thread;
ThreadFromGlobalPool saving_thread;
void threadFunction();
@ -161,7 +162,7 @@ SystemLog<LogElement>::SystemLog(Context & context_,
log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");
data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE);
saving_thread = std::thread([this] { threadFunction(); });
saving_thread = ThreadFromGlobalPool([this] { threadFunction(); });
}

View File

@ -36,8 +36,7 @@ String ThreadStatus::getQueryID()
void CurrentThread::defaultThreadDeleter()
{
ThreadStatus & thread = *CurrentThread::get();
LOG_TRACE(thread.log, "Thread " << thread.thread_number << " exited");
ThreadStatus & thread = CurrentThread::get();
thread.detachQuery(true, true);
}
@ -51,8 +50,8 @@ void ThreadStatus::initializeQuery()
memory_tracker.setParent(&thread_group->memory_tracker);
thread_group->memory_tracker.setDescription("(for query)");
thread_group->master_thread = shared_from_this();
thread_group->thread_statuses.emplace(thread_number, shared_from_this());
thread_group->master_thread = this;
thread_group->thread_statuses.emplace(thread_number, this);
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
@ -87,8 +86,8 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
if (!global_context)
global_context = thread_group->global_context;
if (!thread_group->thread_statuses.emplace(thread_number, shared_from_this()).second)
throw Exception("Thread " + std::to_string(thread_number) + " is attached twice", ErrorCodes::LOGICAL_ERROR);
/// NOTE: A thread may be attached multiple times if it is reused from a thread pool.
thread_group->thread_statuses.emplace(thread_number, this);
}
initPerformanceCounters();
@ -193,48 +192,47 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
void CurrentThread::initializeQuery()
{
get()->initializeQuery();
getScope()->deleter = CurrentThread::defaultThreadDeleter;
get().initializeQuery();
get().deleter = CurrentThread::defaultThreadDeleter;
}
void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
{
get()->attachQuery(thread_group, true);
getScope()->deleter = CurrentThread::defaultThreadDeleter;
get().attachQuery(thread_group, true);
get().deleter = CurrentThread::defaultThreadDeleter;
}
void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group)
{
get()->attachQuery(thread_group, false);
getScope()->deleter = CurrentThread::defaultThreadDeleter;
get().attachQuery(thread_group, false);
get().deleter = CurrentThread::defaultThreadDeleter;
}
std::string CurrentThread::getCurrentQueryID()
{
if (!get() || get().use_count() <= 0)
if (!current_thread)
return {};
return get()->getQueryID();
return get().getQueryID();
}
void CurrentThread::attachQueryContext(Context & query_context)
{
return get()->attachQueryContext(query_context);
return get().attachQueryContext(query_context);
}
void CurrentThread::finalizePerformanceCounters()
{
get()->finalizePerformanceCounters();
get().finalizePerformanceCounters();
}
void CurrentThread::detachQuery()
{
get()->detachQuery(false);
get().detachQuery(false);
}
void CurrentThread::detachQueryIfNotDetached()
{
get()->detachQuery(true);
get().detachQuery(true);
}

View File

@ -2,7 +2,7 @@
#include <thread>
#include <future>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/FileStream.h>

View File

@ -1,4 +1,4 @@
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/StorageDistributed.h>
#include <Common/ThreadPool.h>
#include <atomic>
#include <thread>
@ -55,7 +56,7 @@ private:
std::mutex mutex;
std::condition_variable cond;
Logger * log;
std::thread thread {&StorageDistributedDirectoryMonitor::run, this};
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
};
}

View File

@ -4,7 +4,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Core/Block.h>
#include <Common/Throttler.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <atomic>
#include <memory>
#include <chrono>

View File

@ -67,7 +67,7 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
thread = ThreadFromGlobalPool([this] { threadFunction(); });
}
@ -110,7 +110,7 @@ BackgroundProcessingPool::~BackgroundProcessingPool()
{
shutdown = true;
wake_event.notify_all();
for (std::thread & thread : threads)
for (auto & thread : threads)
thread.join();
}
catch (...)

View File

@ -13,6 +13,8 @@
#include <Poco/Timestamp.h>
#include <Core/Types.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
namespace DB
@ -60,7 +62,7 @@ protected:
friend class BackgroundProcessingPoolTaskInfo;
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority).
using Threads = std::vector<std::thread>;
using Threads = std::vector<ThreadFromGlobalPool>;
const size_t size;

View File

@ -420,7 +420,7 @@ void StorageBuffer::startup()
<< " Set apropriate system_profile to fix this.");
}
flush_thread = std::thread(&StorageBuffer::flushThread, this);
flush_thread = ThreadFromGlobalPool(&StorageBuffer::flushThread, this);
}

View File

@ -4,6 +4,7 @@
#include <thread>
#include <ext/shared_ptr_helper.h>
#include <Core/NamesAndTypes.h>
#include <Common/ThreadPool.h>
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Poco/Event.h>
@ -110,7 +111,7 @@ private:
Poco::Event shutdown_event;
/// Resets data by timeout.
std::thread flush_thread;
ThreadFromGlobalPool flush_thread;
void flushAllBuffers(bool check_thresholds = true);
/// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded.

View File

@ -2,7 +2,7 @@
#include <Storages/IStorage.h>
#include <Core/Defines.h>
#include <common/MultiVersion.h>
#include <Common/MultiVersion.h>
#include <ext/shared_ptr_helper.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>

View File

@ -45,7 +45,7 @@
#include <Poco/DirectoryIterator.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <ext/range.h>
#include <ext/scope_guard.h>

View File

@ -0,0 +1,51 @@
# Anonymized Yandex.Metrica Data
Dataset consists of two tables containing anonymized data about hits (`hits_v1`) and visits (`visits_v1`) of Yandex.Metrica. Each of the tables can be downloaded as a compressed `tsv.xz` file or as prepared partitions.
## Obtaining Tables from Prepared Partitions
**Download and import hits:**
```bash
curl -O https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_v1.tar
tar xvf hits_v1.tar -C /var/lib/clickhouse # path to ClickHouse data directory
# check permissions on unpacked data, fix if required
sudo service clickhouse-server restart
clickhouse-client --query "SELECT COUNT(*) FROM datasets.hits_v1"
```
**Download and import visits:**
```bash
curl -O https://clickhouse-datasets.s3.yandex.net/visits/partitions/visits_v1.tar
tar xvf visits_v1.tar -C /var/lib/clickhouse # path to ClickHouse data directory
# check permissions on unpacked data, fix if required
sudo service clickhouse-server restart
clickhouse-client --query "SELECT COUNT(*) FROM datasets.visits_v1"
```
## Obtaining Tables from Compressed tsv-file
**Download and import hits from compressed tsv-file**
```bash
curl https://clickhouse-datasets.s3.yandex.net/hits/tsv/hits_v1.tsv.xz | unxz --threads=`nproc` > hits_v1.tsv
# now create table
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets"
clickhouse-client --query "CREATE TABLE datasets.hits_v1 ( WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, URLDomain String, RefererDomain String, Refresh UInt8, IsRobot UInt8, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), UTCEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), RemoteIP UInt32, RemoteIP6 FixedString(16), WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, RedirectTiming Int32, DOMInteractiveTiming Int32, DOMContentLoadedTiming Int32, DOMCompleteTiming Int32, LoadEventStartTiming Int32, LoadEventEndTiming Int32, NSToDOMContentLoadedTiming Int32, FirstPaintTiming Int32, RedirectCount Int8, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, GoalsReached Array(UInt32), OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32, YCLID UInt64, ShareService String, ShareURL String, ShareTitle String, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), IslandID FixedString(16), RequestNum UInt32, RequestTry UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192"
# import data
cat hits_v1.tsv | clickhouse-client --query "INSERT INTO datasets.hits_v1 FORMAT TSV" --max_insert_block_size=100000
# optionally you can optimize table
clickhouse-client --query "OPTIMIZE TABLE datasets.hits_v1 FINAL"
clickhouse-client --query "SELECT COUNT(*) FROM datasets.hits_v1"
```
**Download and import visits from compressed tsv-file**
```bash
curl https://clickhouse-datasets.s3.yandex.net/visits/tsv/visits_v1.tsv.xz | unxz --threads=`nproc` > visits_v1.tsv
# now create table
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets"
clickhouse-client --query "CREATE TABLE datasets.visits_v1 ( CounterID UInt32, StartDate Date, Sign Int8, IsNew UInt8, VisitID UInt64, UserID UInt64, StartTime DateTime, Duration UInt32, UTCStartTime DateTime, PageViews Int32, Hits Int32, IsBounce UInt8, Referer String, StartURL String, RefererDomain String, StartURLDomain String, EndURL String, LinkURL String, IsDownload UInt8, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, PlaceID Int32, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), IsYandex UInt8, GoalReachesDepth Int32, GoalReachesURL Int32, GoalReachesAny Int32, SocialSourceNetworkID UInt8, SocialSourcePage String, MobilePhoneModel String, ClientEventTime DateTime, RegionID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RemoteIP UInt32, RemoteIP6 FixedString(16), IPNetworkID UInt32, SilverlightVersion3 UInt32, CodeVersion UInt32, ResolutionWidth UInt16, ResolutionHeight UInt16, UserAgentMajor UInt16, UserAgentMinor UInt16, WindowClientWidth UInt16, WindowClientHeight UInt16, SilverlightVersion2 UInt8, SilverlightVersion4 UInt16, FlashVersion3 UInt16, FlashVersion4 UInt16, ClientTimeZone Int16, OS UInt8, UserAgent UInt8, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, NetMajor UInt8, NetMinor UInt8, MobilePhone UInt8, SilverlightVersion1 UInt8, Age UInt8, Sex UInt8, Income UInt8, JavaEnable UInt8, CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, BrowserLanguage UInt16, BrowserCountry UInt16, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), Params Array(String), Goals Nested(ID UInt32, Serial UInt32, EventTime DateTime, Price Int64, OrderID String, CurrencyID UInt32), WatchIDs Array(UInt64), ParamSumPrice Int64, ParamCurrency FixedString(3), ParamCurrencyID UInt16, ClickLogID UInt64, ClickEventID Int32, ClickGoodEvent Int32, ClickEventTime DateTime, ClickPriorityID Int32, ClickPhraseID Int32, ClickPageID Int32, ClickPlaceID Int32, ClickTypeID Int32, ClickResourceID Int32, ClickCost UInt32, ClickClientIP UInt32, ClickDomainID UInt32, ClickURL String, ClickAttempt UInt8, ClickOrderID UInt32, ClickBannerID UInt32, ClickMarketCategoryID UInt32, ClickMarketPP UInt32, ClickMarketCategoryName String, ClickMarketPPName String, ClickAWAPSCampaignName String, ClickPageName String, ClickTargetType UInt16, ClickTargetPhraseID UInt64, ClickContextType UInt8, ClickSelectType Int8, ClickOptions String, ClickGroupBannerID Int32, OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, FirstVisit DateTime, PredLastVisit Date, LastVisit Date, TotalVisits UInt32, TraficSource Nested(ID Int8, SearchEngineID UInt16, AdvEngineID UInt8, PlaceID UInt16, SocialSourceNetworkID UInt8, Domain String, SearchPhrase String, SocialSourcePage String), Attendance FixedString(16), CLID UInt32, YCLID UInt64, NormalizedRefererHash UInt64, SearchPhraseHash UInt64, RefererDomainHash UInt64, NormalizedStartURLHash UInt64, StartURLDomainHash UInt64, NormalizedEndURLHash UInt64, TopLevelDomain UInt64, URLScheme UInt64, OpenstatServiceNameHash UInt64, OpenstatCampaignIDHash UInt64, OpenstatAdIDHash UInt64, OpenstatSourceIDHash UInt64, UTMSourceHash UInt64, UTMMediumHash UInt64, UTMCampaignHash UInt64, UTMContentHash UInt64, UTMTermHash UInt64, FromHash UInt64, WebVisorEnabled UInt8, WebVisorActivity UInt32, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), Market Nested(Type UInt8, GoalID UInt32, OrderID String, OrderPrice Int64, PP UInt32, DirectPlaceID UInt32, DirectOrderID UInt32, DirectBannerID UInt32, GoodID String, GoodName String, GoodQuantity Int32, GoodPrice Int64), IslandID FixedString(16)) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign)"
# import data
cat visits_v1.tsv | clickhouse-client --query "INSERT INTO datasets.visits_v1 FORMAT TSV" --max_insert_block_size=100000
# optionally you can optimize table
clickhouse-client --query "OPTIMIZE TABLE datasets.visits_v1 FINAL"
clickhouse-client --query "SELECT COUNT(*) FROM datasets.visits_v1"
```
## Queries
Examples of queries to these tables (they are named `test.hits` and `test.visits`) can be found among [stateful tests](https://github.com/yandex/ClickHouse/tree/master/dbms/tests/queries/1_stateful) and in some [performance tests](https://github.com/yandex/ClickHouse/tree/master/dbms/tests/performance/test_hits) of ClickHouse.

View File

@ -0,0 +1,51 @@
# Анонимизированные данные Яндекс.Метрики
Датасет состоит из двух таблиц, содержащих анонимизированные данные о хитах (`hits_v1`) и визитах (`visits_v1`) Яндекс.Метрики. Каждую из таблиц можно скачать в виде сжатого `.tsv.xz`-файла или в виде уже готовых партиций.
## Получение таблиц из партиций
**Скачивание и импортирование партиций hits:**
```bash
curl -O https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_v1.tar
tar xvf hits_v1.tar -C /var/lib/clickhouse # путь к папке с данными ClickHouse
# убедитесь, что установлены корректные права доступа на файлы
sudo service clickhouse-server restart
clickhouse-client --query "SELECT COUNT(*) FROM datasets.hits_v1"
```
**Скачивание и импортирование партиций visits:**
```bash
curl -O https://clickhouse-datasets.s3.yandex.net/visits/partitions/visits_v1.tar
tar xvf visits_v1.tar -C /var/lib/clickhouse # путь к папке с данными ClickHouse
# убедитесь, что установлены корректные права доступа на файлы
sudo service clickhouse-server restart
clickhouse-client --query "SELECT COUNT(*) FROM datasets.visits_v1"
```
## Получение таблиц из сжатых tsv-файлов
**Скачивание и импортирование hits из сжатого tsv-файла**
```bash
curl https://clickhouse-datasets.s3.yandex.net/hits/tsv/hits_v1.tsv.xz | unxz --threads=`nproc` > hits_v1.tsv
# теперь создадим таблицу
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets"
clickhouse-client --query "CREATE TABLE datasets.hits_v1 ( WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, URLDomain String, RefererDomain String, Refresh UInt8, IsRobot UInt8, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), UTCEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), RemoteIP UInt32, RemoteIP6 FixedString(16), WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, RedirectTiming Int32, DOMInteractiveTiming Int32, DOMContentLoadedTiming Int32, DOMCompleteTiming Int32, LoadEventStartTiming Int32, LoadEventEndTiming Int32, NSToDOMContentLoadedTiming Int32, FirstPaintTiming Int32, RedirectCount Int8, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, GoalsReached Array(UInt32), OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32, YCLID UInt64, ShareService String, ShareURL String, ShareTitle String, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), IslandID FixedString(16), RequestNum UInt32, RequestTry UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192"
# импортируем данные
cat hits_v1.tsv | clickhouse-client --query "INSERT INTO datasets.hits_v1 FORMAT TSV" --max_insert_block_size=100000
# опционально можно оптимизировать таблицу
clickhouse-client --query "OPTIMIZE TABLE datasets.hits_v1 FINAL"
clickhouse-client --query "SELECT COUNT(*) FROM datasets.hits_v1"
```
**Скачивание и импортирование visits из сжатого tsv-файла**
```bash
curl https://clickhouse-datasets.s3.yandex.net/visits/tsv/visits_v1.tsv.xz | unxz --threads=`nproc` > visits_v1.tsv
# теперь создадим таблицу
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets"
clickhouse-client --query "CREATE TABLE datasets.visits_v1 ( CounterID UInt32, StartDate Date, Sign Int8, IsNew UInt8, VisitID UInt64, UserID UInt64, StartTime DateTime, Duration UInt32, UTCStartTime DateTime, PageViews Int32, Hits Int32, IsBounce UInt8, Referer String, StartURL String, RefererDomain String, StartURLDomain String, EndURL String, LinkURL String, IsDownload UInt8, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, PlaceID Int32, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), IsYandex UInt8, GoalReachesDepth Int32, GoalReachesURL Int32, GoalReachesAny Int32, SocialSourceNetworkID UInt8, SocialSourcePage String, MobilePhoneModel String, ClientEventTime DateTime, RegionID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RemoteIP UInt32, RemoteIP6 FixedString(16), IPNetworkID UInt32, SilverlightVersion3 UInt32, CodeVersion UInt32, ResolutionWidth UInt16, ResolutionHeight UInt16, UserAgentMajor UInt16, UserAgentMinor UInt16, WindowClientWidth UInt16, WindowClientHeight UInt16, SilverlightVersion2 UInt8, SilverlightVersion4 UInt16, FlashVersion3 UInt16, FlashVersion4 UInt16, ClientTimeZone Int16, OS UInt8, UserAgent UInt8, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, NetMajor UInt8, NetMinor UInt8, MobilePhone UInt8, SilverlightVersion1 UInt8, Age UInt8, Sex UInt8, Income UInt8, JavaEnable UInt8, CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, BrowserLanguage UInt16, BrowserCountry UInt16, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), Params Array(String), Goals Nested(ID UInt32, Serial UInt32, EventTime DateTime, Price Int64, OrderID String, CurrencyID UInt32), WatchIDs Array(UInt64), ParamSumPrice Int64, ParamCurrency FixedString(3), ParamCurrencyID UInt16, ClickLogID UInt64, ClickEventID Int32, ClickGoodEvent Int32, ClickEventTime DateTime, ClickPriorityID Int32, ClickPhraseID Int32, ClickPageID Int32, ClickPlaceID Int32, ClickTypeID Int32, ClickResourceID Int32, ClickCost UInt32, ClickClientIP UInt32, ClickDomainID UInt32, ClickURL String, ClickAttempt UInt8, ClickOrderID UInt32, ClickBannerID UInt32, ClickMarketCategoryID UInt32, ClickMarketPP UInt32, ClickMarketCategoryName String, ClickMarketPPName String, ClickAWAPSCampaignName String, ClickPageName String, ClickTargetType UInt16, ClickTargetPhraseID UInt64, ClickContextType UInt8, ClickSelectType Int8, ClickOptions String, ClickGroupBannerID Int32, OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, FirstVisit DateTime, PredLastVisit Date, LastVisit Date, TotalVisits UInt32, TraficSource Nested(ID Int8, SearchEngineID UInt16, AdvEngineID UInt8, PlaceID UInt16, SocialSourceNetworkID UInt8, Domain String, SearchPhrase String, SocialSourcePage String), Attendance FixedString(16), CLID UInt32, YCLID UInt64, NormalizedRefererHash UInt64, SearchPhraseHash UInt64, RefererDomainHash UInt64, NormalizedStartURLHash UInt64, StartURLDomainHash UInt64, NormalizedEndURLHash UInt64, TopLevelDomain UInt64, URLScheme UInt64, OpenstatServiceNameHash UInt64, OpenstatCampaignIDHash UInt64, OpenstatAdIDHash UInt64, OpenstatSourceIDHash UInt64, UTMSourceHash UInt64, UTMMediumHash UInt64, UTMCampaignHash UInt64, UTMContentHash UInt64, UTMTermHash UInt64, FromHash UInt64, WebVisorEnabled UInt8, WebVisorActivity UInt32, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), Market Nested(Type UInt8, GoalID UInt32, OrderID String, OrderPrice Int64, PP UInt32, DirectPlaceID UInt32, DirectOrderID UInt32, DirectBannerID UInt32, GoodID String, GoodName String, GoodQuantity Int32, GoodPrice Int64), IslandID FixedString(16)) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign)"
# импортируем данные
cat visits_v1.tsv | clickhouse-client --query "INSERT INTO datasets.visits_v1 FORMAT TSV" --max_insert_block_size=100000
# опционально можно оптимизировать таблицу
clickhouse-client --query "OPTIMIZE TABLE datasets.visits_v1 FINAL"
clickhouse-client --query "SELECT COUNT(*) FROM datasets.visits_v1"
```
## Запросы
Примеры запросов к этим таблицам (они называются `test.hits` и `test.visits`) можно найти среди [stateful тестов](https://github.com/yandex/ClickHouse/tree/master/dbms/tests/queries/1_stateful) и в некоторых [performance тестах](https://github.com/yandex/ClickHouse/tree/master/dbms/tests/performance/test_hits) ClickHouse.

View File

@ -284,7 +284,7 @@ curl -O https://clickhouse-datasets.s3.yandex.net/trips_mergetree/partitions/tri
tar xvf trips_mergetree.tar -C /var/lib/clickhouse # путь к папке с данными ClickHouse
# убедитесь, что установлены корректные права доступа на файлы
sudo service clickhouse-server restart
clickhouse-client --query "select count(*) from datasets.trips_mergetree"
clickhouse-client --query "SELECT COUNT(*) FROM datasets.trips_mergetree"
```
!!!info

View File

@ -151,7 +151,7 @@ curl -O https://clickhouse-datasets.s3.yandex.net/ontime/partitions/ontime.tar
tar xvf ontime.tar -C /var/lib/clickhouse # путь к папке с данными ClickHouse
# убедитесь, что установлены корректные права доступа на файлы
sudo service clickhouse-server restart
clickhouse-client --query "select count(*) from datasets.ontime"
clickhouse-client --query "SELECT COUNT(*) FROM datasets.ontime"
```
!!!info

View File

@ -16,6 +16,7 @@ nav:
- 'WikiStat': 'getting_started/example_datasets/wikistat.md'
- 'Terabyte Click Logs from Criteo': 'getting_started/example_datasets/criteo.md'
- 'Star Schema Benchmark': 'getting_started/example_datasets/star_schema.md'
- 'Yandex.Metrica Data': 'getting_started/example_datasets/metrica.md'
- 'Interfaces':
- 'Introduction': 'interfaces/index.md'

View File

@ -16,6 +16,7 @@ nav:
- 'WikiStat': 'getting_started/example_datasets/wikistat.md'
- 'Терабайт логов кликов от Criteo': 'getting_started/example_datasets/criteo.md'
- 'Схема «Звезда»': 'getting_started/example_datasets/star_schema.md'
- 'Данные Яндекс.Метрики': 'getting_started/example_datasets/metrica.md'
- 'Интерфейсы':
- 'Введение': 'interfaces/index.md'

View File

@ -18,7 +18,6 @@ add_library (common ${LINK_MODE}
src/mremap.cpp
src/JSON.cpp
src/getMemoryAmount.cpp
src/ThreadPool.cpp
src/demangle.cpp
src/SetTerminalEcho.cpp
@ -34,11 +33,9 @@ add_library (common ${LINK_MODE}
include/common/mremap.h
include/common/likely.h
include/common/logger_useful.h
include/common/MultiVersion.h
include/common/strong_typedef.h
include/common/JSON.h
include/common/getMemoryAmount.h
include/common/ThreadPool.h
include/common/demangle.h
include/common/SetTerminalEcho.h
include/common/find_symbols.h

View File

@ -1,76 +0,0 @@
#pragma once
#include <cstdint>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <vector>
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
*/
class ThreadPool
{
public:
using Job = std::function<void()>;
/// Size is constant, all threads are created immediately.
explicit ThreadPool(size_t m_size);
/// Add new job. Locks until free thread in pool become available or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
void schedule(Job job);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait();
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
/// You should not destroy object while calling schedule or wait methods from another threads.
~ThreadPool();
size_t size() const { return m_size; }
/// Returns number of active jobs.
size_t active() const;
private:
mutable std::mutex mutex;
std::condition_variable has_free_thread;
std::condition_variable has_new_job_or_shutdown;
const size_t m_size;
size_t active_jobs = 0;
bool shutdown = false;
std::queue<Job> jobs;
std::vector<std::thread> threads;
std::exception_ptr first_exception;
void worker();
void finalize();
};
/// Allows to save first catched exception in jobs and postpone its rethrow.
class ExceptionHandler
{
public:
void setException(std::exception_ptr && exception);
void throwIfException();
private:
std::exception_ptr first_exception;
std::mutex mutex;
};
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler);

View File

@ -1,161 +0,0 @@
#include <common/ThreadPool.h>
#include <iostream>
ThreadPool::ThreadPool(size_t m_size)
: m_size(m_size)
{
threads.reserve(m_size);
try
{
for (size_t i = 0; i < m_size; ++i)
threads.emplace_back([this] { worker(); });
}
catch (...)
{
finalize();
throw;
}
}
void ThreadPool::schedule(Job job)
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs < m_size || shutdown; });
if (shutdown)
return;
jobs.push(std::move(job));
++active_jobs;
}
has_new_job_or_shutdown.notify_one();
}
void ThreadPool::wait()
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs == 0; });
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
}
}
ThreadPool::~ThreadPool()
{
finalize();
}
void ThreadPool::finalize()
{
{
std::unique_lock<std::mutex> lock(mutex);
shutdown = true;
}
has_new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
threads.clear();
}
size_t ThreadPool::active() const
{
std::unique_lock<std::mutex> lock(mutex);
return active_jobs;
}
void ThreadPool::worker()
{
while (true)
{
Job job;
bool need_shutdown = false;
{
std::unique_lock<std::mutex> lock(mutex);
has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
need_shutdown = shutdown;
if (!jobs.empty())
{
job = std::move(jobs.front());
jobs.pop();
}
else
{
return;
}
}
if (!need_shutdown)
{
try
{
job();
}
catch (...)
{
{
std::unique_lock<std::mutex> lock(mutex);
if (!first_exception)
first_exception = std::current_exception();
shutdown = true;
--active_jobs;
}
has_free_thread.notify_all();
has_new_job_or_shutdown.notify_all();
return;
}
}
{
std::unique_lock<std::mutex> lock(mutex);
--active_jobs;
}
has_free_thread.notify_all();
}
}
void ExceptionHandler::setException(std::exception_ptr && exception)
{
std::unique_lock<std::mutex> lock(mutex);
if (!first_exception)
first_exception = std::move(exception);
}
void ExceptionHandler::throwIfException()
{
std::unique_lock<std::mutex> lock(mutex);
if (first_exception)
std::rethrow_exception(first_exception);
}
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler)
{
return [job{std::move(job)}, &handler] ()
{
try
{
job();
}
catch (...)
{
handler.setException(std::current_exception());
}
};
}

View File

@ -5,7 +5,6 @@ add_executable (date_lut2 date_lut2.cpp)
add_executable (date_lut3 date_lut3.cpp)
add_executable (date_lut4 date_lut4.cpp)
add_executable (date_lut_default_timezone date_lut_default_timezone.cpp)
add_executable (multi_version multi_version.cpp)
add_executable (local_date_time_comparison local_date_time_comparison.cpp)
add_executable (realloc-perf allocator.cpp)
@ -16,10 +15,8 @@ target_link_libraries (date_lut2 common ${PLATFORM_LIBS})
target_link_libraries (date_lut3 common ${PLATFORM_LIBS})
target_link_libraries (date_lut4 common ${PLATFORM_LIBS})
target_link_libraries (date_lut_default_timezone common ${PLATFORM_LIBS})
target_link_libraries (multi_version common)
target_link_libraries (local_date_time_comparison common)
target_link_libraries (realloc-perf common)
add_check(multi_version)
add_check(local_date_time_comparison)
if(USE_GTEST)

View File

@ -1000,8 +1000,6 @@ void BaseDaemon::initialize(Application & self)
}
initializeTerminationAndSignalProcessing();
DB::CurrentThread::get(); /// TODO Why do we need this?
logRevision();
for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))

View File

@ -11,7 +11,7 @@
#include <Poco/Exception.h>
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <IO/BufferWithOwnMemory.h>
#include <cstdlib>

View File

@ -1,5 +1,5 @@
#if __APPLE__ || __FreeBSD__
int main(int argc, char ** argv) { return 0; }
int main(int, char **) { return 0; }
#else
#include <fcntl.h>
@ -11,7 +11,7 @@ int main(int argc, char ** argv) { return 0; }
#include <vector>
#include <Poco/Exception.h>
#include <Common/Exception.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadHelpers.h>
@ -22,10 +22,7 @@ int main(int argc, char ** argv) { return 0; }
#include <sys/stat.h>
#include <sys/types.h>
#include <IO/AIO.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
#endif
#include <malloc.h>
#include <sys/syscall.h>

View File

@ -16,7 +16,7 @@
#include <Poco/Exception.h>
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <port/clock.h>