#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int TIMEOUT_EXCEEDED; extern const int UNKNOWN_EXCEPTION; extern const int UNKNOWN_FORMAT; } AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_) : query(query_->clone()), settings(settings_) { } AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other) : query(other.query->clone()), settings(other.settings) { } AsynchronousInsertQueue::InsertQuery & AsynchronousInsertQueue::InsertQuery::operator=(const InsertQuery & other) { if (this != &other) { query = other.query->clone(); settings = other.settings; } return *this; } UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const { SipHash hash; insert_query.query->updateTreeHash(hash); for (const auto & setting : insert_query.settings.allChanged()) { hash.update(setting.getName()); applyVisitor(FieldVisitorHash(hash), setting.getValue()); } return hash.get64(); } bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const { return queryToString(query) == queryToString(other.query) && settings == other.settings; } AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_) : bytes(std::move(bytes_)), query_id(std::move(query_id_)) { } void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_) { std::lock_guard lock(mutex); finished = true; exception = exception_; cv.notify_all(); } bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout) const { std::unique_lock lock(mutex); return cv.wait_for(lock, timeout, [&] { return finished; }); } bool AsynchronousInsertQueue::InsertData::Entry::isFinished() const { std::lock_guard lock(mutex); return finished; } std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() const { std::lock_guard lock(mutex); return exception; } AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) : WithContext(context_) , max_data_size(max_data_size_) , busy_timeout(timeouts.busy) , stale_timeout(timeouts.stale) , pool(pool_size) , dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this) , cleanup_thread(&AsynchronousInsertQueue::cleanup, this) { using namespace std::chrono; assert(pool_size); if (stale_timeout > 0ms) dump_by_last_update_thread = ThreadFromGlobalPool(&AsynchronousInsertQueue::staleCheck, this); } AsynchronousInsertQueue::~AsynchronousInsertQueue() { /// TODO: add a setting for graceful shutdown. shutdown = true; assert(dump_by_first_update_thread.joinable()); dump_by_first_update_thread.join(); assert(cleanup_thread.joinable()); cleanup_thread.join(); if (dump_by_last_update_thread.joinable()) dump_by_last_update_thread.join(); pool.wait(); std::lock_guard lock(currently_processing_mutex); for (const auto & [_, entry] : currently_processing_queries) { if (!entry->isFinished()) entry->finish(std::make_exception_ptr(Exception( ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)"))); } } void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context) { /// Wrap 'unique_ptr' with 'shared_ptr' to make this /// lambda copyable and allow to save it to the thread pool. pool.scheduleOrThrowOnError([=, data = std::make_shared(std::move(data))] { processData(std::move(key), std::move(*data), std::move(global_context)); }); } void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context) { query = query->clone(); const auto & settings = query_context->getSettingsRef(); auto & insert_query = query->as(); InterpreterInsertQuery interpreter(query, query_context, settings.insert_allow_materialized_columns); auto table = interpreter.getTable(insert_query); auto sample_block = interpreter.getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr()); if (!FormatFactory::instance().isInputFormat(insert_query.format)) throw Exception(ErrorCodes::UNKNOWN_FORMAT, "Unknown input format {}", insert_query.format); query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames()); String bytes; { auto read_buf = getReadBufferFromASTInsertQuery(query); WriteBufferFromString write_buf(bytes); copyData(*read_buf, write_buf); } auto entry = std::make_shared(std::move(bytes), query_context->getCurrentQueryId()); InsertQuery key{query, settings}; { /// Firstly try to get entry from queue without exclusive lock. std::shared_lock read_lock(rwlock); if (auto it = queue.find(key); it != queue.end()) { pushImpl(std::move(entry), it); return; } } std::unique_lock write_lock(rwlock); auto it = queue.emplace(key, std::make_shared()).first; pushImpl(std::move(entry), it); } void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator it) { auto & [data_mutex, data] = *it->second; std::lock_guard data_lock(data_mutex); if (!data) data = std::make_unique(); data->size += entry->bytes.size(); data->last_update = std::chrono::steady_clock::now(); data->entries.emplace_back(entry); { std::lock_guard currently_processing_lock(currently_processing_mutex); currently_processing_queries.emplace(entry->query_id, entry); } LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'", data->entries.size(), data->size, queryToString(it->first.query)); if (data->size > max_data_size) scheduleDataProcessingJob(it->first, std::move(data), getContext()); } void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout) { InsertData::EntryPtr entry; { std::lock_guard lock(currently_processing_mutex); auto it = currently_processing_queries.find(query_id); if (it == currently_processing_queries.end()) return; entry = it->second; } bool finished = entry->wait(timeout); if (!finished) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count()); if (auto exception = entry->getException()) std::rethrow_exception(exception); } void AsynchronousInsertQueue::busyCheck() { auto timeout = busy_timeout; while (!shutdown) { std::this_thread::sleep_for(timeout); /// TODO: use priority queue instead of raw unsorted queue. timeout = busy_timeout; std::shared_lock read_lock(rwlock); for (auto & [key, elem] : queue) { std::lock_guard data_lock(elem->mutex); if (!elem->data) continue; auto lag = std::chrono::steady_clock::now() - elem->data->first_update; if (lag >= busy_timeout) scheduleDataProcessingJob(key, std::move(elem->data), getContext()); else timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); } } } void AsynchronousInsertQueue::staleCheck() { while (!shutdown) { std::this_thread::sleep_for(stale_timeout); std::shared_lock read_lock(rwlock); for (auto & [key, elem] : queue) { std::lock_guard data_lock(elem->mutex); if (!elem->data) continue; auto lag = std::chrono::steady_clock::now() - elem->data->last_update; if (lag >= stale_timeout) scheduleDataProcessingJob(key, std::move(elem->data), getContext()); } } } void AsynchronousInsertQueue::cleanup() { /// Do not run cleanup too often, /// because it holds exclusive lock. auto timeout = busy_timeout * 5; while (!shutdown) { std::this_thread::sleep_for(timeout); std::vector keys_to_remove; { std::shared_lock read_lock(rwlock); for (auto & [key, elem] : queue) { std::lock_guard data_lock(elem->mutex); if (!elem->data) keys_to_remove.push_back(key); } } if (!keys_to_remove.empty()) { std::unique_lock write_lock(rwlock); size_t total_removed = 0; for (const auto & key : keys_to_remove) { auto it = queue.find(key); if (it != queue.end() && !it->second->data) { queue.erase(it); ++total_removed; } } if (total_removed) LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", total_removed); } { std::vector ids_to_remove; std::lock_guard lock(currently_processing_mutex); for (const auto & [query_id, entry] : currently_processing_queries) if (entry->isFinished()) ids_to_remove.push_back(query_id); if (!ids_to_remove.empty()) { for (const auto & id : ids_to_remove) currently_processing_queries.erase(id); LOG_TRACE(log, "Removed {} finished entries from asynchronous insertion queue", ids_to_remove.size()); } } } } // static void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context) try { if (!data) return; const auto * log = &Poco::Logger::get("AsynchronousInsertQueue"); const auto & insert_query = assert_cast(*key.query); auto insert_context = Context::createCopy(global_context); /// 'resetParser' doesn't work for parallel parsing. key.settings.set("input_format_parallel_parsing", false); insert_context->makeQueryContext(); insert_context->setSettings(key.settings); /// Set initial_query_id, because it's used in InterpreterInsertQuery for table lock. insert_context->getClientInfo().query_kind = ClientInfo::QueryKind::INITIAL_QUERY; insert_context->setCurrentQueryId(""); InterpreterInsertQuery interpreter(key.query, insert_context, key.settings.insert_allow_materialized_columns, false, false, true); auto pipeline = interpreter.execute().pipeline; assert(pipeline.pushing()); auto header = pipeline.getHeader(); auto format = getInputFormatFromASTInsertQuery(key.query, false, header, insert_context, nullptr); size_t total_rows = 0; InsertData::EntryPtr current_entry; auto on_error = [&](const MutableColumns & result_columns, Exception & e) { LOG_ERROR(log, "Failed parsing for query '{}' with query id {}. {}", queryToString(key.query), current_entry->query_id, e.displayText()); for (const auto & column : result_columns) if (column->size() > total_rows) column->popBack(column->size() - total_rows); current_entry->finish(std::current_exception()); return 0; }; std::shared_ptr adding_defaults_transform; if (insert_context->getSettingsRef().input_format_defaults_for_omitted_fields) { StoragePtr storage = DatabaseCatalog::instance().getTable(insert_query.table_id, insert_context); auto metadata_snapshot = storage->getInMemoryMetadataPtr(); const auto & columns = metadata_snapshot->getColumns(); if (columns.hasDefaults()) adding_defaults_transform = std::make_shared(header, columns, *format, insert_context); } StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform)); std::unique_ptr last_buffer; for (const auto & entry : data->entries) { auto buffer = std::make_unique(entry->bytes); current_entry = entry; total_rows += executor.execute(*buffer); /// Keep buffer, because it still can be used /// in destructor, while resetting buffer at next iteration. last_buffer = std::move(buffer); } format->addBuffer(std::move(last_buffer)); auto chunk = Chunk(executor.getResultColumns(), total_rows); size_t total_bytes = chunk.bytes(); auto source = std::make_shared(header, std::move(chunk)); pipeline.complete(Pipe(std::move(source))); CompletedPipelineExecutor completed_executor(pipeline); completed_executor.execute(); LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", total_rows, total_bytes, queryToString(key.query)); for (const auto & entry : data->entries) if (!entry->isFinished()) entry->finish(); } catch (const Exception & e) { finishWithException(key.query, data->entries, e); } catch (const Poco::Exception & e) { finishWithException(key.query, data->entries, e); } catch (const std::exception & e) { finishWithException(key.query, data->entries, e); } catch (...) { finishWithException(key.query, data->entries, Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception")); } template void AsynchronousInsertQueue::finishWithException( const ASTPtr & query, const std::list & entries, const E & exception) { tryLogCurrentException("AsynchronousInsertQueue", fmt::format("Failed insertion for query '{}'", queryToString(query))); for (const auto & entry : entries) { if (!entry->isFinished()) { /// Make a copy of exception to avoid concurrent usage of /// one exception object from several threads. entry->finish(std::make_exception_ptr(exception)); } } } }