mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
fix async inserts with alter
This commit is contained in:
parent
00aa60ca03
commit
1aaf9a08c4
@ -33,6 +33,8 @@
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
@ -308,6 +310,7 @@ void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const
|
||||
/* no_squash */ false,
|
||||
/* no_destination */ false,
|
||||
/* async_insert */ false);
|
||||
|
||||
auto table = interpreter.getTable(insert_query);
|
||||
auto sample_block = InterpreterInsertQuery::getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr(), query_context);
|
||||
|
||||
@ -318,6 +321,10 @@ void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const
|
||||
/// InterpreterInsertQuery::getTable() -> ITableFunction::execute().
|
||||
if (insert_query.table_id)
|
||||
query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames());
|
||||
|
||||
insert_query.columns = std::make_shared<ASTExpressionList>();
|
||||
for (const auto & column : sample_block)
|
||||
insert_query.columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
|
||||
}
|
||||
|
||||
AsynchronousInsertQueue::PushResult
|
||||
@ -872,36 +879,35 @@ try
|
||||
}
|
||||
};
|
||||
|
||||
Chunk chunk;
|
||||
auto header = pipeline.getHeader();
|
||||
|
||||
if (key.data_kind == DataKind::Parsed)
|
||||
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log);
|
||||
else
|
||||
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_asynchronous_insert_log);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());
|
||||
|
||||
auto log_and_add_finish_to_query_log = [&](size_t num_rows, size_t num_bytes)
|
||||
{
|
||||
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
|
||||
queue_shard_flush_time_history.updateWithCurrentTime();
|
||||
|
||||
bool pulling_pipeline = false;
|
||||
logQueryFinish(
|
||||
query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
|
||||
};
|
||||
|
||||
|
||||
if (chunk.getNumRows() == 0)
|
||||
{
|
||||
finish_entries();
|
||||
log_and_add_finish_to_query_log(0, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Chunk chunk;
|
||||
auto header = pipeline.getHeader();
|
||||
|
||||
if (key.data_kind == DataKind::Parsed)
|
||||
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log);
|
||||
else
|
||||
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_asynchronous_insert_log);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());
|
||||
|
||||
auto log_and_add_finish_to_query_log = [&](size_t num_rows, size_t num_bytes)
|
||||
{
|
||||
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
|
||||
queue_shard_flush_time_history.updateWithCurrentTime();
|
||||
|
||||
bool pulling_pipeline = false;
|
||||
logQueryFinish(
|
||||
query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
|
||||
};
|
||||
|
||||
if (chunk.getNumRows() == 0)
|
||||
{
|
||||
finish_entries();
|
||||
log_and_add_finish_to_query_log(0, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
size_t num_rows = chunk.getNumRows();
|
||||
size_t num_bytes = chunk.bytes();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user