minor changes near async inserts

This commit is contained in:
Anton Popov 2021-09-01 18:06:11 +03:00
parent fb0790cf82
commit 5e694596c9
4 changed files with 11 additions and 32 deletions

View File

@ -27,24 +27,6 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_)
: query(query_->clone()), settings(settings_)
{
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other)
: query(other.query->clone()), settings(other.settings)
{
}
AsynchronousInsertQueue::InsertQuery &
AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other)
{
query = other.query->clone();
settings = other.settings;
return *this;
}
UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const
{
SipHash hash;
@ -124,7 +106,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
}
}
void AsynchronousInsertQueue::scheduleProcessJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
void AsynchronousInsertQueue::scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
{
/// Wrap 'unique_ptr' with 'shared_ptr' to make this
/// lambda copyable and allow to save it to the thread pool.
@ -178,10 +160,11 @@ void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settin
currently_processing_queries.emplace(query_id, entry);
}
LOG_INFO(log, "Queue size {} for query '{}'", data->size, queryToString(*query));
LOG_INFO(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
data->entries.size(), data->size, queryToString(*query));
if (data->size > max_data_size)
scheduleProcessJob(key, std::move(data), getContext());
scheduleProcessDataJob(key, std::move(data), getContext());
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)
@ -226,7 +209,7 @@ void AsynchronousInsertQueue::busyCheck()
auto lag = std::chrono::steady_clock::now() - elem->data->first_update;
if (lag >= busy_timeout)
scheduleProcessJob(key, std::move(elem->data), getContext());
scheduleProcessDataJob(key, std::move(elem->data), getContext());
else
timeout = std::min(timeout, std::chrono::ceil<std::chrono::seconds>(busy_timeout - lag));
}
@ -248,7 +231,7 @@ void AsynchronousInsertQueue::staleCheck()
auto lag = std::chrono::steady_clock::now() - elem->data->last_update;
if (lag >= stale_timeout)
scheduleProcessJob(key, std::move(elem->data), getContext());
scheduleProcessDataJob(key, std::move(elem->data), getContext());
}
}
}
@ -354,7 +337,7 @@ try
auto out_executor = out_pipeline.execute();
out_executor->execute(out_pipeline.getNumThreads());
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'",
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(key.query));
for (const auto & entry : data->entries)

View File

@ -41,9 +41,6 @@ private:
ASTPtr query;
Settings settings;
InsertQuery(const ASTPtr & query_, const Settings & settings_);
InsertQuery(const InsertQuery & other);
InsertQuery & operator==(const InsertQuery & other);
bool operator==(const InsertQuery & other) const;
struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; };
};
@ -124,7 +121,7 @@ private:
void staleCheck();
void cleanup();
void scheduleProcessJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
void scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
};

View File

@ -563,7 +563,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (settings.wait_for_async_insert)
{
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto source = std::make_shared<WaitForAsyncInsertSource>(Block(), query_id, timeout, context->getGlobalContext());
auto source = std::make_shared<WaitForAsyncInsertSource>(query_id, timeout, context->getGlobalContext());
io.pipeline.init(Pipe(source));
}

View File

@ -10,9 +10,8 @@ class WaitForAsyncInsertSource : public ISource, WithContext
{
public:
WaitForAsyncInsertSource(
const Block & header, const String & query_id_,
size_t timeout_ms_, ContextPtr context_)
: ISource(std::move(header))
const String & query_id_, size_t timeout_ms_, ContextPtr context_)
: ISource(Block())
, WithContext(context_)
, query_id(query_id_)
, timeout_ms(timeout_ms_)