This commit is contained in:
Ivan Lezhankin 2021-04-19 17:51:26 +03:00
parent 7f43dddf0b
commit 6ed9e34750
7 changed files with 118 additions and 32 deletions

View File

@ -428,6 +428,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_WARNING(log, "Server was built with sanitizer. It will work slowly.");
#endif
// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
// Also do it before global context initialization since it also may use threads from global pool.
GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000));
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases, ...
*/
@ -437,11 +443,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::SERVER);
// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000));
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });

View File

@ -46,7 +46,7 @@ class IBlockInputStream : public TypePromotion<IBlockInputStream>
public:
IBlockInputStream() { info.parent = this; }
virtual ~IBlockInputStream() {}
virtual ~IBlockInputStream() = default;
IBlockInputStream(const IBlockInputStream &) = delete;
IBlockInputStream & operator=(const IBlockInputStream &) = delete;

View File

@ -40,7 +40,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
format = "Values";
}
/// NOTE: can't create an input-format with empty buffer here, because parallel input-format starts to read immediately.
/// FIXME: can't create an input-format with empty buffer here, because parallel input-format starts to read immediately.
res_stream = context->getInputFormat(format, input_buffer, header, context->getSettings().max_insert_block_size);
if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)

View File

@ -27,7 +27,7 @@ struct AsynchronousInsertQueue::InsertData
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update;
std::chrono::time_point<std::chrono::steady_clock> first_update = std::chrono::steady_clock::now();
/// Timestamp of the last insert into queue.
/// Used to detect for how long the queue is stale, so we can dump it by another timer.
@ -67,16 +67,41 @@ bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery
return true;
}
AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_data_size_)
: max_data_size(max_data_size_), lock(RWLockImpl::create()), queue(new Queue), pool(pool_size)
AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
: max_data_size(max_data_size_)
, busy_timeout(timeouts.busy)
, stale_timeout(timeouts.stale)
, lock(RWLockImpl::create())
, queue(new Queue)
, pool(pool_size)
, dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this)
, dump_by_last_update_thread(&AsynchronousInsertQueue::staleCheck, this)
{
}
AsynchronousInsertQueue::~AsynchronousInsertQueue()
{
/// TODO: add a setting for graceful shutdown.
shutdown = true;
assert(dump_by_first_update_thread.joinable());
dump_by_first_update_thread.join();
assert(dump_by_last_update_thread.joinable());
dump_by_last_update_thread.join();
pool.wait();
}
bool AsynchronousInsertQueue::push(ASTInsertQuery * query, const Settings & settings)
{
auto read_lock = lock->getLock(RWLockImpl::Read, String());
auto it = queue->find(InsertQuery{query->shared_from_this(), settings});
/// FIXME: we should take a data lock before reading `reset` or make this field atomic.
/// On the other side it looks fine even as it is - since we don't reset `data` explicitly.
if (it != queue->end() && !it->second->reset)
{
pushImpl(query, it);
@ -90,43 +115,82 @@ void AsynchronousInsertQueue::push(ASTInsertQuery * query, BlockIO && io, const
{
auto write_lock = lock->getLock(RWLockImpl::Write, String());
auto it = queue->find(InsertQuery{query->shared_from_this(), settings});
InsertQuery key{query->shared_from_this(), settings};
auto it = queue->find(key);
if (it == queue->end())
{
InsertQuery key{query->shared_from_this(), settings};
it = queue->insert({key, std::make_shared<InsertData>()}).first;
it->second->io = std::move(io);
it->second->first_update = std::chrono::steady_clock::now();
}
else
else if (it->second->reset)
{
std::unique_lock<std::mutex> data_lock(it->second->mutex);
it->second->reset = false;
it->second = std::make_shared<InsertData>();
it->second->io = std::move(io);
/// All other fields should have been already reset.
}
pushImpl(query, it);
}
void AsynchronousInsertQueue::busyCheck()
{
auto timeout = busy_timeout;
while (!shutdown)
{
std::this_thread::sleep_for(timeout);
auto read_lock = lock->getLock(RWLockImpl::Read, String());
/// TODO: use priority queue instead of raw unsorted queue.
timeout = busy_timeout;
for (auto & [_, data] : *queue)
{
std::unique_lock<std::mutex> data_lock(data->mutex);
auto lag = std::chrono::steady_clock::now() - data->first_update;
if (lag >= busy_timeout)
pool.scheduleOrThrowOnError([data = data] { processData(data); });
else
timeout = std::min(timeout, std::chrono::ceil<std::chrono::seconds>(busy_timeout - lag));
}
}
}
void AsynchronousInsertQueue::staleCheck()
{
while(!shutdown)
{
std::this_thread::sleep_for(stale_timeout);
auto read_lock = lock->getLock(RWLockImpl::Read, String());
for (auto & [_, data] : *queue)
{
std::unique_lock<std::mutex> data_lock(data->mutex);
auto lag = std::chrono::steady_clock::now() - data->last_update;
if (lag >= stale_timeout)
pool.scheduleOrThrowOnError([data = data] { processData(data); });
}
}
}
void AsynchronousInsertQueue::pushImpl(ASTInsertQuery * query, QueueIterator & it)
{
ConcatReadBuffer::Buffers buffers;
ConcatReadBuffer concat_buf;
auto ast_buf = std::make_unique<ReadBufferFromMemory>(query->data, query->data ? query->end - query->data : 0);
if (query->data)
buffers.push_back(std::move(ast_buf));
concat_buf.appendBuffer(std::move(ast_buf));
if (query->tail)
buffers.push_back(wrapReadBufferReference(*query->tail));
concat_buf.appendBuffer(wrapReadBufferReference(*query->tail));
/// NOTE: must not read from |query->tail| before read all between |query->data| and |query->end|.
ConcatReadBuffer concat_buf(std::move(buffers));
std::unique_lock<std::mutex> data_lock(it->second->mutex);
/// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure.
@ -156,17 +220,21 @@ void AsynchronousInsertQueue::processData(std::shared_ptr<InsertData> data)
{
std::unique_lock<std::mutex> data_lock(data->mutex);
if (data->reset)
return;
auto in = std::dynamic_pointer_cast<InputStreamFromASTInsertQuery>(data->io.in);
assert(in);
auto log_progress = [](const Block & block)
{
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), "Flushed {} rows", block.rows());
};
for (const auto & datum : data->data)
in->appendBuffer(std::make_unique<ReadBufferFromString>(datum));
copyData(*in, *data->io.out);
copyData(*in, *data->io.out, [] {return false;}, log_progress);
data->data.clear();
data->size = 0;
data->first_update = std::chrono::steady_clock::now();
data->last_update = data->first_update;
data->reset = true;
}

View File

@ -17,7 +17,14 @@ struct BlockIO;
class AsynchronousInsertQueue
{
public:
AsynchronousInsertQueue(size_t pool_size, size_t max_data_size);
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
struct Timeout
{
size_t busy, stale; /// in seconds
};
AsynchronousInsertQueue(size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue();
bool push(ASTInsertQuery * query, const Settings & settings);
void push(ASTInsertQuery * query, BlockIO && io, const Settings & settings);
@ -42,24 +49,32 @@ class AsynchronousInsertQueue
/// Logic and events behind queue are as follows:
/// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables.
/// - dump_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - access_timeout: also we have to check if user still has access to the tables periodically, and if the access is lost, then
/// we dump pending data and delete queue immediately.
/// - max_data_size: if the maximum size of data is reached, then again we dump the data.
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<InsertData>, InsertQueryHash, InsertQueryEquality>;
using QueueIterator = Queue::iterator;
const size_t max_data_size; /// in bytes
const std::chrono::seconds busy_timeout, stale_timeout;
RWLock lock;
std::unique_ptr<Queue> queue;
std::atomic<bool> shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
/// TODO: ThreadFromGlobalPool remove_empty_thread, check_access_thread;
void busyCheck();
void staleCheck();
void pushImpl(ASTInsertQuery * query, QueueIterator & it); /// use only under lock
static void processData(std::shared_ptr<InsertData> data);

View File

@ -513,7 +513,7 @@ ContextPtr Context::createGlobal(ContextSharedPart * shared)
void Context::initGlobal()
{
DatabaseCatalog::init(shared_from_this());
shared->async_insert_queue = std::make_shared<AsynchronousInsertQueue>(16, 1024);
shared->async_insert_queue = std::make_shared<AsynchronousInsertQueue>(16, 1024, AsynchronousInsertQueue::Timeout{1, 1});
}
SharedContextHolder Context::createShared()

View File

@ -293,6 +293,8 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.select || query.watch)
{
/// XXX: is this branch also triggered for select+input() case?
const auto & header = out_streams.at(0)->getHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
res.pipeline.getHeader().getColumnsWithTypeAndName(),