Merge branch 'master' into async-connect-to-multiple-ips

This commit is contained in:
Kruglov Pavel 2023-07-17 21:02:33 +02:00 committed by GitHub
commit 4ed8dcd436
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1084 additions and 451 deletions

View File

@ -1255,3 +1255,15 @@ Result:
│ A240 │
└──────────────────┘
```
## initcap
Convert the first letter of each word to upper case and the rest to lower case. Words are sequences of alphanumeric characters separated by non-alphanumeric characters.
## initcapUTF8
Like [initcap](#initcap), assuming that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
Does not detect the language, e.g. for Turkish the result might not be exactly correct (i/İ vs. i/I).
If the length of the UTF-8 byte sequence is different for upper and lower case of a code point, the result may be incorrect for this code point.

View File

@ -1113,3 +1113,14 @@ A text with tags .
The content within <b>CDATA</b>
Do Nothing for 2 Minutes 2:00 &nbsp;
```
## initcap {#initcap}
Переводит первую букву каждого слова в строке в верхний регистр, а остальные — в нижний. Словами считаются последовательности алфавитно-цифровых символов, разделённые любыми другими символами.
## initcapUTF8 {#initcapUTF8}
Как [initcap](#initcap), предполагая, что строка содержит набор байтов, представляющий текст в кодировке UTF-8.
Не учитывает язык. То есть, для турецкого языка, результат может быть не совсем верным.
Если длина UTF-8 последовательности байтов различна для верхнего и нижнего регистра кодовой точки, то для этой кодовой точки результат работы может быть некорректным.
Если строка содержит набор байтов, не являющийся UTF-8, то поведение не определено.

View File

@ -67,29 +67,38 @@ struct AggregateFunctionBoundingRatioData
}
}
void serialize(WriteBuffer & buf) const
{
writeBinary(empty, buf);
if (!empty)
{
writePODBinary(left, buf);
writePODBinary(right, buf);
}
}
void deserialize(ReadBuffer & buf)
{
readBinary(empty, buf);
if (!empty)
{
readPODBinary(left, buf);
readPODBinary(right, buf);
}
}
void serialize(WriteBuffer & buf) const;
void deserialize(ReadBuffer & buf);
};
template <std::endian endian>
inline void transformEndianness(AggregateFunctionBoundingRatioData::Point & p)
{
transformEndianness<endian>(p.x);
transformEndianness<endian>(p.y);
}
void AggregateFunctionBoundingRatioData::serialize(WriteBuffer & buf) const
{
writeBinaryLittleEndian(empty, buf);
if (!empty)
{
writeBinaryLittleEndian(left, buf);
writeBinaryLittleEndian(right, buf);
}
}
void AggregateFunctionBoundingRatioData::deserialize(ReadBuffer & buf)
{
readBinaryLittleEndian(empty, buf);
if (!empty)
{
readBinaryLittleEndian(left, buf);
readBinaryLittleEndian(right, buf);
}
}
class AggregateFunctionBoundingRatio final : public IAggregateFunctionDataHelper<AggregateFunctionBoundingRatioData, AggregateFunctionBoundingRatio>
{

View File

@ -103,18 +103,18 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
writeIntBinary(this->data(place).sum, buf);
writeIntBinary(this->data(place).first, buf);
writeIntBinary(this->data(place).last, buf);
writePODBinary<bool>(this->data(place).seen, buf);
writeBinaryLittleEndian(this->data(place).sum, buf);
writeBinaryLittleEndian(this->data(place).first, buf);
writeBinaryLittleEndian(this->data(place).last, buf);
writeBinaryLittleEndian(this->data(place).seen, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
readIntBinary(this->data(place).sum, buf);
readIntBinary(this->data(place).first, buf);
readIntBinary(this->data(place).last, buf);
readPODBinary<bool>(this->data(place).seen, buf);
readBinaryLittleEndian(this->data(place).sum, buf);
readBinaryLittleEndian(this->data(place).first, buf);
readBinaryLittleEndian(this->data(place).last, buf);
readBinaryLittleEndian(this->data(place).seen, buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override

View File

@ -144,22 +144,22 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
writeIntBinary(this->data(place).sum, buf);
writeIntBinary(this->data(place).first, buf);
writeIntBinary(this->data(place).first_ts, buf);
writeIntBinary(this->data(place).last, buf);
writeIntBinary(this->data(place).last_ts, buf);
writePODBinary<bool>(this->data(place).seen, buf);
writeBinaryLittleEndian(this->data(place).sum, buf);
writeBinaryLittleEndian(this->data(place).first, buf);
writeBinaryLittleEndian(this->data(place).first_ts, buf);
writeBinaryLittleEndian(this->data(place).last, buf);
writeBinaryLittleEndian(this->data(place).last_ts, buf);
writeBinaryLittleEndian(this->data(place).seen, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
readIntBinary(this->data(place).sum, buf);
readIntBinary(this->data(place).first, buf);
readIntBinary(this->data(place).first_ts, buf);
readIntBinary(this->data(place).last, buf);
readIntBinary(this->data(place).last_ts, buf);
readPODBinary<bool>(this->data(place).seen, buf);
readBinaryLittleEndian(this->data(place).sum, buf);
readBinaryLittleEndian(this->data(place).first, buf);
readBinaryLittleEndian(this->data(place).first_ts, buf);
readBinaryLittleEndian(this->data(place).last, buf);
readBinaryLittleEndian(this->data(place).last_ts, buf);
readBinaryLittleEndian(this->data(place).seen, buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override

View File

@ -266,19 +266,20 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
const auto & value = this->data(place).value;
size_t size = value.size();
const size_t size = value.size();
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
for (const auto & element : value)
writeBinaryLittleEndian(element, buf);
if constexpr (Trait::last)
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
writeBinaryLittleEndian(this->data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
writeBinaryLittleEndian(this->data(place).total_values, buf);
WriteBufferFromOwnString rng_buf;
rng_buf << this->data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
writeStringBinary(rng_buf.str(), buf);
}
}
@ -297,16 +298,17 @@ public:
auto & value = this->data(place).value;
value.resize_exact(size, arena);
buf.readStrict(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
for (auto & element : value)
readBinaryLittleEndian(element, buf);
if constexpr (Trait::last)
DB::readIntBinary<size_t>(this->data(place).total_values, buf);
readBinaryLittleEndian(this->data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::readIntBinary<size_t>(this->data(place).total_values, buf);
readBinaryLittleEndian(this->data(place).total_values, buf);
std::string rng_string;
DB::readStringBinary(rng_string, buf);
readStringBinary(rng_string, buf);
ReadBufferFromString rng_buf(rng_string);
rng_buf >> this->data(place).rng;
}
@ -603,14 +605,14 @@ public:
node->write(buf);
if constexpr (Trait::last)
DB::writeIntBinary<size_t>(data(place).total_values, buf);
writeBinaryLittleEndian(data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(data(place).total_values, buf);
writeBinaryLittleEndian(data(place).total_values, buf);
WriteBufferFromOwnString rng_buf;
rng_buf << data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
writeStringBinary(rng_buf.str(), buf);
}
}
@ -636,13 +638,13 @@ public:
value[i] = Node::read(buf, arena);
if constexpr (Trait::last)
DB::readIntBinary<size_t>(data(place).total_values, buf);
readBinaryLittleEndian(data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::readIntBinary<size_t>(data(place).total_values, buf);
readBinaryLittleEndian(data(place).total_values, buf);
std::string rng_string;
DB::readStringBinary(rng_string, buf);
readStringBinary(rng_string, buf);
ReadBufferFromString rng_buf(rng_string);
rng_buf >> data(place).rng;
}

View File

@ -233,35 +233,35 @@ public:
void write(WriteBuffer & buf) const
{
writeIntBinary<size_t>(compress_threshold, buf);
writeFloatBinary<double>(relative_error, buf);
writeIntBinary<size_t>(count, buf);
writeIntBinary<size_t>(sampled.size(), buf);
writeBinaryLittleEndian(compress_threshold, buf);
writeBinaryLittleEndian(relative_error, buf);
writeBinaryLittleEndian(count, buf);
writeBinaryLittleEndian(sampled.size(), buf);
for (const auto & stats : sampled)
{
writeFloatBinary<T>(stats.value, buf);
writeIntBinary<Int64>(stats.g, buf);
writeIntBinary<Int64>(stats.delta, buf);
writeBinaryLittleEndian(stats.value, buf);
writeBinaryLittleEndian(stats.g, buf);
writeBinaryLittleEndian(stats.delta, buf);
}
}
void read(ReadBuffer & buf)
{
readIntBinary<size_t>(compress_threshold, buf);
readFloatBinary<double>(relative_error, buf);
readIntBinary<size_t>(count, buf);
readBinaryLittleEndian(compress_threshold, buf);
readBinaryLittleEndian(relative_error, buf);
readBinaryLittleEndian(count, buf);
size_t sampled_len = 0;
readIntBinary<size_t>(sampled_len, buf);
readBinaryLittleEndian(sampled_len, buf);
sampled.resize(sampled_len);
for (size_t i = 0; i < sampled_len; ++i)
{
auto stats = sampled[i];
readFloatBinary<T>(stats.value, buf);
readIntBinary<Int64>(stats.g, buf);
readIntBinary<Int64>(stats.delta, buf);
readBinaryLittleEndian(stats.value, buf);
readBinaryLittleEndian(stats.g, buf);
readBinaryLittleEndian(stats.delta, buf);
}
}

View File

@ -207,8 +207,8 @@ public:
void read(DB::ReadBuffer & buf)
{
DB::readIntBinary<size_t>(sample_count, buf);
DB::readIntBinary<size_t>(total_values, buf);
DB::readBinaryLittleEndian(sample_count, buf);
DB::readBinaryLittleEndian(total_values, buf);
size_t size = std::min(total_values, sample_count);
static constexpr size_t MAX_RESERVOIR_SIZE = 1_GiB;
@ -224,22 +224,22 @@ public:
rng_buf >> rng;
for (size_t i = 0; i < samples.size(); ++i)
DB::readBinary(samples[i], buf);
DB::readBinaryLittleEndian(samples[i], buf);
sorted = false;
}
void write(DB::WriteBuffer & buf) const
{
DB::writeIntBinary<size_t>(sample_count, buf);
DB::writeIntBinary<size_t>(total_values, buf);
DB::writeBinaryLittleEndian(sample_count, buf);
DB::writeBinaryLittleEndian(total_values, buf);
DB::WriteBufferFromOwnString rng_buf;
rng_buf << rng;
DB::writeStringBinary(rng_buf.str(), buf);
for (size_t i = 0; i < std::min(sample_count, total_values); ++i)
DB::writeBinary(samples[i], buf);
DB::writeBinaryLittleEndian(samples[i], buf);
}
private:

View File

@ -6223,7 +6223,11 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
const auto & insertion_table = scope_context->getInsertionTable();
if (!insertion_table.empty())
{
const auto & insert_structure = DatabaseCatalog::instance().getTable(insertion_table, scope_context)->getInMemoryMetadataPtr()->getColumns();
const auto & insert_structure = DatabaseCatalog::instance()
.getTable(insertion_table, scope_context)
->getInMemoryMetadataPtr()
->getColumns()
.getInsertable();
DB::ColumnsDescription structure_hint;
bool use_columns_from_insert_query = true;

View File

@ -59,4 +59,10 @@ inline void transformEndianness(std::pair<A, B> & pair)
transformEndianness<endian>(pair.first);
transformEndianness<endian>(pair.second);
}
template <std::endian endian, typename T, typename Tag>
inline void transformEndianness(StrongTypedef<T, Tag> & x)
{
transformEndianness<endian>(x.toUnderType());
}
}

View File

@ -133,8 +133,6 @@ struct LowerUpperUTF8Impl
}
else
{
static const Poco::UTF8Encoding utf8;
size_t src_sequence_length = UTF8::seqLength(*src);
/// In case partial buffer was passed (due to SSE optimization)
/// we cannot convert it with current src_end, but we may have more

66
src/Functions/initcap.cpp Normal file
View File

@ -0,0 +1,66 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
namespace
{
struct InitcapImpl
{
static void vector(const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
if (data.empty())
return;
res_data.resize(data.size());
res_offsets.assign(offsets);
array(data.data(), data.data() + data.size(), res_data.data());
}
static void vectorFixed(const ColumnString::Chars & data, size_t /*n*/, ColumnString::Chars & res_data)
{
res_data.resize(data.size());
array(data.data(), data.data() + data.size(), res_data.data());
}
private:
static void array(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
{
bool prev_alphanum = false;
for (; src < src_end; ++src, ++dst)
{
char c = *src;
bool alphanum = isAlphaNumericASCII(c);
if (alphanum && !prev_alphanum)
if (isAlphaASCII(c))
*dst = toUpperIfAlphaASCII(c);
else
*dst = c;
else if (isAlphaASCII(c))
*dst = toLowerIfAlphaASCII(c);
else
*dst = c;
prev_alphanum = alphanum;
}
}
};
struct NameInitcap
{
static constexpr auto name = "initcap";
};
using FunctionInitcap = FunctionStringToString<InitcapImpl, NameInitcap>;
}
REGISTER_FUNCTION(Initcap)
{
factory.registerFunction<FunctionInitcap>({}, FunctionFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,114 @@
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionStringToString.h>
#include <Functions/LowerUpperUTF8Impl.h>
#include <Functions/FunctionFactory.h>
#include <Poco/Unicode.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
struct InitcapUTF8Impl
{
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
if (data.empty())
return;
res_data.resize(data.size());
res_offsets.assign(offsets);
array(data.data(), data.data() + data.size(), offsets, res_data.data());
}
[[noreturn]] static void vectorFixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function initcapUTF8 cannot work with FixedString argument");
}
static void processCodePoint(const UInt8 *& src, const UInt8 * src_end, UInt8 *& dst, bool& prev_alphanum)
{
size_t src_sequence_length = UTF8::seqLength(*src);
auto src_code_point = UTF8::convertUTF8ToCodePoint(src, src_end - src);
if (src_code_point)
{
bool alpha = Poco::Unicode::isAlpha(*src_code_point);
bool alphanum = alpha || Poco::Unicode::isDigit(*src_code_point);
int dst_code_point = *src_code_point;
if (alphanum && !prev_alphanum)
{
if (alpha)
dst_code_point = Poco::Unicode::toUpper(*src_code_point);
}
else if (alpha)
{
dst_code_point = Poco::Unicode::toLower(*src_code_point);
}
prev_alphanum = alphanum;
if (dst_code_point > 0)
{
size_t dst_sequence_length = UTF8::convertCodePointToUTF8(dst_code_point, dst, src_end - src);
assert(dst_sequence_length <= 4);
if (dst_sequence_length == src_sequence_length)
{
src += dst_sequence_length;
dst += dst_sequence_length;
return;
}
}
}
*dst = *src;
++dst;
++src;
prev_alphanum = false;
}
private:
static void array(const UInt8 * src, const UInt8 * src_end, const ColumnString::Offsets & offsets, UInt8 * dst)
{
const auto * offset_it = offsets.begin();
const UInt8 * begin = src;
/// handle remaining symbols, row by row (to avoid influence of bad UTF8 symbols from one row, to another)
while (src < src_end)
{
const UInt8 * row_end = begin + *offset_it;
chassert(row_end >= src);
bool prev_alphanum = false;
while (src < row_end)
processCodePoint(src, row_end, dst, prev_alphanum);
++offset_it;
}
}
};
struct NameInitcapUTF8
{
static constexpr auto name = "initcapUTF8";
};
using FunctionInitcapUTF8 = FunctionStringToString<InitcapUTF8Impl, NameInitcapUTF8>;
}
REGISTER_FUNCTION(InitcapUTF8)
{
factory.registerFunction<FunctionInitcapUTF8>();
}
}

View File

@ -55,21 +55,10 @@ void AsynchronousInsertLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*query);
columns[i++]->insert(queryToString(insert_query));
if (insert_query.table_id)
{
columns[i++]->insert(insert_query.table_id.getDatabaseName());
columns[i++]->insert(insert_query.table_id.getTableName());
}
else
{
columns[i++]->insertDefault();
columns[i++]->insertDefault();
}
columns[i++]->insert(insert_query.format);
columns[i++]->insert(query_for_logging);
columns[i++]->insert(database);
columns[i++]->insert(table);
columns[i++]->insert(format);
columns[i++]->insert(query_id);
columns[i++]->insert(bytes);
columns[i++]->insert(rows);

View File

@ -21,8 +21,11 @@ struct AsynchronousInsertLogElement
time_t event_time{};
Decimal64 event_time_microseconds{};
ASTPtr query;
String query_id;
String query_for_logging;
String database;
String table;
String format;
UInt64 bytes{};
UInt64 rows{};
String exception;

View File

@ -1,33 +1,37 @@
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Core/Settings.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/LimitReadBuffer.h>
#include <IO/copyData.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/IStorage.h>
#include <Common/CurrentThread.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitorHash.h>
#include <Common/DateLUT.h>
#include <Access/Common/AccessFlags.h>
#include <Access/EnabledQuota.h>
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
#include <Common/logger_useful.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/copyData.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <QueryPipeline/BlockIO.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/IStorage.h>
#include <Common/CurrentThread.h>
#include <Common/DateLUT.h>
#include <Common/FieldVisitorHash.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/SipHash.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics
@ -202,6 +206,7 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
query = query->clone();
const auto & settings = query_context->getSettingsRef();
auto & insert_query = query->as<ASTInsertQuery &>();
insert_query.async_insert_flush = true;
InterpreterInsertQuery interpreter(query, query_context, settings.insert_allow_materialized_columns);
auto table = interpreter.getTable(insert_query);
@ -398,6 +403,12 @@ try
const auto * log = &Poco::Logger::get("AsynchronousInsertQueue");
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*key.query);
auto insert_context = Context::createCopy(global_context);
DB::CurrentThread::QueryScope query_scope_holder(insert_context);
bool internal = false; // To enable logging this query
bool async_insert = true;
/// Disabled query spans. Could be activated by initializing this to a SpanHolder
std::shared_ptr<OpenTelemetry::SpanHolder> query_span{nullptr};
/// 'resetParser' doesn't work for parallel parsing.
key.settings.set("input_format_parallel_parsing", false);
@ -405,12 +416,67 @@ try
insert_context->setSettings(key.settings);
/// Set initial_query_id, because it's used in InterpreterInsertQuery for table lock.
insert_context->getClientInfo().query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
insert_context->setCurrentQueryId("");
InterpreterInsertQuery interpreter(key.query, insert_context, key.settings.insert_allow_materialized_columns, false, false, true);
auto pipeline = interpreter.execute().pipeline;
assert(pipeline.pushing());
auto insert_query_id = insert_context->getCurrentQueryId();
auto query_start_time = std::chrono::system_clock::now();
Stopwatch start_watch{CLOCK_MONOTONIC};
ClientInfo & client_info = insert_context->getClientInfo();
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.initial_query_start_time = timeInSeconds(query_start_time);
client_info.initial_query_start_time_microseconds = timeInMicroseconds(query_start_time);
client_info.current_query_id = insert_query_id;
client_info.initial_query_id = insert_query_id;
size_t log_queries_cut_to_length = insert_context->getSettingsRef().log_queries_cut_to_length;
String query_for_logging = insert_query.hasSecretParts()
? insert_query.formatForLogging(log_queries_cut_to_length)
: wipeSensitiveDataAndCutToLength(serializeAST(insert_query), log_queries_cut_to_length);
/// We add it to the process list so
/// a) it appears in system.processes
/// b) can be cancelled if we want to
/// c) has an associated process list element where runtime metrics are stored
auto process_list_entry
= insert_context->getProcessList().insert(query_for_logging, key.query.get(), insert_context, start_watch.getStart());
auto query_status = process_list_entry->getQueryStatus();
insert_context->setProcessListElement(std::move(query_status));
String query_database{};
String query_table{};
if (insert_query.table_id)
{
query_database = insert_query.table_id.getDatabaseName();
query_table = insert_query.table_id.getTableName();
insert_context->setInsertionTable(insert_query.table_id);
}
std::unique_ptr<DB::IInterpreter> interpreter;
QueryPipeline pipeline;
QueryLogElement query_log_elem;
try
{
interpreter = std::make_unique<InterpreterInsertQuery>(
key.query, insert_context, key.settings.insert_allow_materialized_columns, false, false, true);
pipeline = interpreter->execute().pipeline;
chassert(pipeline.pushing());
query_log_elem = logQueryStart(
query_start_time,
insert_context,
query_for_logging,
key.query,
pipeline,
interpreter,
internal,
query_database,
query_table,
async_insert);
}
catch (...)
{
logExceptionBeforeStart(query_for_logging, insert_context, key.query, query_span, start_watch.elapsedMilliseconds());
throw;
}
auto header = pipeline.getHeader();
auto format = getInputFormatFromASTInsertQuery(key.query, false, header, insert_context, nullptr);
@ -470,7 +536,10 @@ try
AsynchronousInsertLogElement elem;
elem.event_time = timeInSeconds(entry->create_time);
elem.event_time_microseconds = timeInMicroseconds(entry->create_time);
elem.query = key.query;
elem.query_for_logging = query_for_logging;
elem.database = query_database;
elem.table = query_table;
elem.format = insert_query.format;
elem.query_id = entry->query_id;
elem.bytes = bytes_size;
elem.rows = num_rows;
@ -493,7 +562,6 @@ try
}
format->addBuffer(std::move(last_buffer));
auto insert_query_id = insert_context->getCurrentQueryId();
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, total_rows);
auto finish_entries = [&]
@ -531,9 +599,14 @@ try
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, key.query_str);
bool pulling_pipeline = false;
logQueryFinish(query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, internal);
}
catch (...)
{
bool log_error = true;
logQueryException(query_log_elem, insert_context, start_watch, key.query, query_span, internal, log_error);
if (!log_elements.empty())
{
auto exception = getCurrentExceptionMessage(false);

View File

@ -1524,7 +1524,11 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
uint64_t use_structure_from_insertion_table_in_table_functions = getSettingsRef().use_structure_from_insertion_table_in_table_functions;
if (use_structure_from_insertion_table_in_table_functions && table_function_ptr->needStructureHint() && hasInsertionTable())
{
const auto & insert_structure = DatabaseCatalog::instance().getTable(getInsertionTable(), shared_from_this())->getInMemoryMetadataPtr()->getColumns();
const auto & insert_structure = DatabaseCatalog::instance()
.getTable(getInsertionTable(), shared_from_this())
->getInMemoryMetadataPtr()
->getColumns()
.getInsertable();
DB::ColumnsDescription structure_hint;
bool use_columns_from_insert_query = true;

View File

@ -37,8 +37,8 @@ static bool isUnlimitedQuery(const IAST * ast)
if (!ast)
return false;
/// It is KILL QUERY
if (ast->as<ASTKillQueryQuery>())
/// It is KILL QUERY or an async insert flush query
if (ast->as<ASTKillQueryQuery>() || ast->getQueryKind() == IAST::QueryKind::AsyncInsertFlush)
return true;
/// It is SELECT FROM system.processes

View File

@ -393,7 +393,7 @@ public:
/** Register running query. Returns refcounted object, that will remove element from list in destructor.
* If too many running queries - wait for not more than specified (see settings) amount of time.
* If timeout is passed - throw an exception.
* Don't count KILL QUERY queries.
* Don't count KILL QUERY queries or async insert flush queries
*/
EntryPtr insert(const String & query_, const IAST * ast, ContextMutablePtr query_context, UInt64 watch_start_nanoseconds);

View File

@ -155,7 +155,6 @@ static void logQuery(const String & query, ContextPtr context, bool internal, Qu
}
}
/// Call this inside catch block.
static void setExceptionStackTrace(QueryLogElement & elem)
{
@ -208,7 +207,332 @@ static void logException(ContextPtr context, QueryLogElement & elem, bool log_er
LOG_INFO(&Poco::Logger::get("executeQuery"), message);
}
static void onExceptionBeforeStart(
static void
addStatusInfoToQueryElement(QueryLogElement & element, const QueryStatusInfo & info, const ASTPtr query_ast, const ContextPtr context_ptr)
{
const auto time_now = std::chrono::system_clock::now();
UInt64 elapsed_microseconds = info.elapsed_microseconds;
element.event_time = timeInSeconds(time_now);
element.event_time_microseconds = timeInMicroseconds(time_now);
element.query_duration_ms = elapsed_microseconds / 1000;
ProfileEvents::increment(ProfileEvents::QueryTimeMicroseconds, elapsed_microseconds);
if (query_ast->as<ASTSelectQuery>() || query_ast->as<ASTSelectWithUnionQuery>())
{
ProfileEvents::increment(ProfileEvents::SelectQueryTimeMicroseconds, elapsed_microseconds);
}
else if (query_ast->as<ASTInsertQuery>())
{
ProfileEvents::increment(ProfileEvents::InsertQueryTimeMicroseconds, elapsed_microseconds);
}
else
{
ProfileEvents::increment(ProfileEvents::OtherQueryTimeMicroseconds, elapsed_microseconds);
}
element.read_rows = info.read_rows;
element.read_bytes = info.read_bytes;
element.written_rows = info.written_rows;
element.written_bytes = info.written_bytes;
element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
element.thread_ids = info.thread_ids;
element.profile_counters = info.profile_counters;
/// We need to refresh the access info since dependent views might have added extra information, either during
/// creation of the view (PushingToViews chain) or while executing its internal SELECT
const auto & access_info = context_ptr->getQueryAccessInfo();
element.query_databases.insert(access_info.databases.begin(), access_info.databases.end());
element.query_tables.insert(access_info.tables.begin(), access_info.tables.end());
element.query_columns.insert(access_info.columns.begin(), access_info.columns.end());
element.query_partitions.insert(access_info.partitions.begin(), access_info.partitions.end());
element.query_projections.insert(access_info.projections.begin(), access_info.projections.end());
element.query_views.insert(access_info.views.begin(), access_info.views.end());
const auto & factories_info = context_ptr->getQueryFactoriesInfo();
element.used_aggregate_functions = factories_info.aggregate_functions;
element.used_aggregate_function_combinators = factories_info.aggregate_function_combinators;
element.used_database_engines = factories_info.database_engines;
element.used_data_type_families = factories_info.data_type_families;
element.used_dictionaries = factories_info.dictionaries;
element.used_formats = factories_info.formats;
element.used_functions = factories_info.functions;
element.used_storages = factories_info.storages;
element.used_table_functions = factories_info.table_functions;
element.async_read_counters = context_ptr->getAsyncReadCounters();
}
QueryLogElement logQueryStart(
const std::chrono::time_point<std::chrono::system_clock> & query_start_time,
const ContextMutablePtr & context,
const String & query_for_logging,
const ASTPtr & query_ast,
const QueryPipeline & pipeline,
const std::unique_ptr<IInterpreter> & interpreter,
bool internal,
const String & query_database,
const String & query_table,
bool async_insert)
{
const Settings & settings = context->getSettingsRef();
QueryLogElement elem;
elem.type = QueryLogElementType::QUERY_START;
elem.event_time = timeInSeconds(query_start_time);
elem.event_time_microseconds = timeInMicroseconds(query_start_time);
elem.query_start_time = timeInSeconds(query_start_time);
elem.query_start_time_microseconds = timeInMicroseconds(query_start_time);
elem.current_database = context->getCurrentDatabase();
elem.query = query_for_logging;
if (settings.log_formatted_queries)
elem.formatted_query = queryToString(query_ast);
elem.normalized_query_hash = normalizedQueryHash<false>(query_for_logging);
elem.query_kind = query_ast->getQueryKind();
elem.client_info = context->getClientInfo();
if (auto txn = context->getCurrentTransaction())
elem.tid = txn->tid;
bool log_queries = settings.log_queries && !internal;
/// Log into system table start of query execution, if need.
if (log_queries)
{
/// This check is not obvious, but without it 01220_scalar_optimization_in_alter fails.
if (pipeline.initialized())
{
const auto & info = context->getQueryAccessInfo();
elem.query_databases = info.databases;
elem.query_tables = info.tables;
elem.query_columns = info.columns;
elem.query_partitions = info.partitions;
elem.query_projections = info.projections;
elem.query_views = info.views;
}
if (async_insert)
InterpreterInsertQuery::extendQueryLogElemImpl(elem, context);
else if (interpreter)
interpreter->extendQueryLogElem(elem, query_ast, context, query_database, query_table);
if (settings.log_query_settings)
elem.query_settings = std::make_shared<Settings>(context->getSettingsRef());
elem.log_comment = settings.log_comment;
if (elem.log_comment.size() > settings.max_query_size)
elem.log_comment.resize(settings.max_query_size);
if (elem.type >= settings.log_queries_min_type && !settings.log_queries_min_query_duration_ms.totalMilliseconds())
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
}
return elem;
}
void logQueryFinish(
QueryLogElement & elem,
const ContextMutablePtr & context,
const ASTPtr & query_ast,
const QueryPipeline & query_pipeline,
bool pulling_pipeline,
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
bool internal)
{
const Settings & settings = context->getSettingsRef();
auto log_queries = settings.log_queries && !internal;
auto log_queries_min_type = settings.log_queries_min_type;
auto log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds();
auto log_processors_profiles = settings.log_processors_profiles;
QueryStatusPtr process_list_elem = context->getProcessListElement();
if (process_list_elem)
{
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
elem.type = QueryLogElementType::QUERY_FINISH;
addStatusInfoToQueryElement(elem, info, query_ast, context);
if (pulling_pipeline)
{
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
auto progress_callback = context->getProgressCallback();
if (progress_callback)
{
Progress p;
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
if (elem.read_rows != 0)
{
double elapsed_seconds = static_cast<double>(info.elapsed_microseconds) / 1000000.0;
double rows_per_second = static_cast<double>(elem.read_rows) / elapsed_seconds;
LOG_DEBUG(
&Poco::Logger::get("executeQuery"),
"Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows,
ReadableSize(elem.read_bytes),
elapsed_seconds,
rows_per_second,
ReadableSize(elem.read_bytes / elapsed_seconds));
}
if (log_queries && elem.type >= log_queries_min_type
&& static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
/// NOTE: convert this to UInt64
processor_elem.elapsed_us = static_cast<UInt32>(processor->getElapsedUs());
processor_elem.input_wait_elapsed_us = static_cast<UInt32>(processor->getInputWaitElapsedUs());
processor_elem.output_wait_elapsed_us = static_cast<UInt32>(processor->getOutputWaitElapsedUs());
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
}
if (query_span)
{
query_span->addAttribute("db.statement", elem.query);
query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id);
query_span->addAttribute("clickhouse.query_status", "QueryFinish");
query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate);
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);
query_span->addAttributeIfNotZero("clickhouse.written_rows", elem.written_rows);
query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes);
query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage);
query_span->finish();
}
}
void logQueryException(
QueryLogElement & elem,
const ContextMutablePtr & context,
const Stopwatch & start_watch,
const ASTPtr & query_ast,
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
bool internal,
bool log_error)
{
const Settings & settings = context->getSettingsRef();
auto log_queries = settings.log_queries && !internal;
auto log_queries_min_type = settings.log_queries_min_type;
auto log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds();
elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING;
elem.exception_code = getCurrentExceptionCode();
auto exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
elem.exception = std::move(exception_message.text);
elem.exception_format_string = exception_message.format_string;
QueryStatusPtr process_list_elem = context->getProcessListElement();
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
const auto time_now = std::chrono::system_clock::now();
elem.event_time = timeInSeconds(time_now);
elem.event_time_microseconds = timeInMicroseconds(time_now);
if (process_list_elem)
{
QueryStatusInfo info = process_list_elem->getInfo(true, settings.log_profile_events, false);
addStatusInfoToQueryElement(elem, info, query_ast, context);
}
else
{
elem.query_duration_ms = start_watch.elapsedMilliseconds();
}
if (settings.calculate_text_stack_trace && log_error)
setExceptionStackTrace(elem);
logException(context, elem, log_error);
/// In case of exception we log internal queries also
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
ProfileEvents::increment(ProfileEvents::FailedQuery);
if (query_ast->as<ASTSelectQuery>() || query_ast->as<ASTSelectWithUnionQuery>())
ProfileEvents::increment(ProfileEvents::FailedSelectQuery);
else if (query_ast->as<ASTInsertQuery>())
ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
if (query_span)
{
query_span->addAttribute("db.statement", elem.query);
query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id);
query_span->addAttribute("clickhouse.exception", elem.exception);
query_span->addAttribute("clickhouse.exception_code", elem.exception_code);
query_span->finish();
}
}
void logExceptionBeforeStart(
const String & query_for_logging,
ContextPtr context,
ASTPtr ast,
@ -431,7 +755,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
logQuery(query_for_logging, context, internal, stage);
if (!internal)
onExceptionBeforeStart(query_for_logging, context, ast, query_span, start_watch.elapsedMilliseconds());
logExceptionBeforeStart(query_for_logging, context, ast, query_span, start_watch.elapsedMilliseconds());
throw;
}
@ -804,132 +1128,23 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Everything related to query log.
{
QueryLogElement elem;
elem.type = QueryLogElementType::QUERY_START;
elem.event_time = timeInSeconds(query_start_time);
elem.event_time_microseconds = timeInMicroseconds(query_start_time);
elem.query_start_time = timeInSeconds(query_start_time);
elem.query_start_time_microseconds = timeInMicroseconds(query_start_time);
elem.current_database = context->getCurrentDatabase();
elem.query = query_for_logging;
if (settings.log_formatted_queries)
elem.formatted_query = queryToString(ast);
elem.normalized_query_hash = normalizedQueryHash<false>(query_for_logging);
elem.query_kind = ast->getQueryKind();
elem.client_info = client_info;
if (auto txn = context->getCurrentTransaction())
elem.tid = txn->tid;
bool log_queries = settings.log_queries && !internal;
/// Log into system table start of query execution, if need.
if (log_queries)
{
/// This check is not obvious, but without it 01220_scalar_optimization_in_alter fails.
if (pipeline.initialized())
{
const auto & info = context->getQueryAccessInfo();
elem.query_databases = info.databases;
elem.query_tables = info.tables;
elem.query_columns = info.columns;
elem.query_partitions = info.partitions;
elem.query_projections = info.projections;
elem.query_views = info.views;
}
if (async_insert)
InterpreterInsertQuery::extendQueryLogElemImpl(elem, context);
else if (interpreter)
interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table);
if (settings.log_query_settings)
elem.query_settings = std::make_shared<Settings>(context->getSettingsRef());
elem.log_comment = settings.log_comment;
if (elem.log_comment.size() > settings.max_query_size)
elem.log_comment.resize(settings.max_query_size);
if (elem.type >= settings.log_queries_min_type && !settings.log_queries_min_query_duration_ms.totalMilliseconds())
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
}
/// Common code for finish and exception callbacks
auto status_info_to_query_log
= [](QueryLogElement & element, const QueryStatusInfo & info, const ASTPtr query_ast, const ContextPtr context_ptr) mutable
{
const auto time_now = std::chrono::system_clock::now();
UInt64 elapsed_microseconds = info.elapsed_microseconds;
element.event_time = timeInSeconds(time_now);
element.event_time_microseconds = timeInMicroseconds(time_now);
element.query_duration_ms = elapsed_microseconds / 1000;
ProfileEvents::increment(ProfileEvents::QueryTimeMicroseconds, elapsed_microseconds);
if (query_ast->as<ASTSelectQuery>() || query_ast->as<ASTSelectWithUnionQuery>())
{
ProfileEvents::increment(ProfileEvents::SelectQueryTimeMicroseconds, elapsed_microseconds);
}
else if (query_ast->as<ASTInsertQuery>())
{
ProfileEvents::increment(ProfileEvents::InsertQueryTimeMicroseconds, elapsed_microseconds);
}
else
{
ProfileEvents::increment(ProfileEvents::OtherQueryTimeMicroseconds, elapsed_microseconds);
}
element.read_rows = info.read_rows;
element.read_bytes = info.read_bytes;
element.written_rows = info.written_rows;
element.written_bytes = info.written_bytes;
element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
element.thread_ids = info.thread_ids;
element.profile_counters = info.profile_counters;
/// We need to refresh the access info since dependent views might have added extra information, either during
/// creation of the view (PushingToViews chain) or while executing its internal SELECT
const auto & access_info = context_ptr->getQueryAccessInfo();
element.query_databases.insert(access_info.databases.begin(), access_info.databases.end());
element.query_tables.insert(access_info.tables.begin(), access_info.tables.end());
element.query_columns.insert(access_info.columns.begin(), access_info.columns.end());
element.query_partitions.insert(access_info.partitions.begin(), access_info.partitions.end());
element.query_projections.insert(access_info.projections.begin(), access_info.projections.end());
element.query_views.insert(access_info.views.begin(), access_info.views.end());
const auto & factories_info = context_ptr->getQueryFactoriesInfo();
element.used_aggregate_functions = factories_info.aggregate_functions;
element.used_aggregate_function_combinators = factories_info.aggregate_function_combinators;
element.used_database_engines = factories_info.database_engines;
element.used_data_type_families = factories_info.data_type_families;
element.used_dictionaries = factories_info.dictionaries;
element.used_formats = factories_info.formats;
element.used_functions = factories_info.functions;
element.used_storages = factories_info.storages;
element.used_table_functions = factories_info.table_functions;
element.async_read_counters = context_ptr->getAsyncReadCounters();
};
QueryLogElement elem = logQueryStart(
query_start_time,
context,
query_for_logging,
ast,
pipeline,
interpreter,
internal,
query_database,
query_table,
async_insert);
/// Also make possible for caller to log successful query finish and exception during execution.
auto finish_callback = [elem,
context,
ast,
write_into_query_cache,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
log_processors_profiles = settings.log_processors_profiles,
status_info_to_query_log,
internal,
implicit_txn_control,
execute_implicit_tcl_query,
pulling_pipeline = pipeline.pulling(),
@ -940,137 +1155,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// partial/garbage results in case of exceptions during query execution.
query_pipeline.finalizeWriteInQueryCache();
QueryStatusPtr process_list_elem = context->getProcessListElement();
if (process_list_elem)
{
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
elem.type = QueryLogElementType::QUERY_FINISH;
status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
{
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
auto progress_callback = context->getProgressCallback();
if (progress_callback)
{
Progress p;
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
if (elem.read_rows != 0)
{
double elapsed_seconds = static_cast<double>(info.elapsed_microseconds) / 1000000.0;
double rows_per_second = static_cast<double>(elem.read_rows) / elapsed_seconds;
LOG_DEBUG(
&Poco::Logger::get("executeQuery"),
"Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows,
ReadableSize(elem.read_bytes),
elapsed_seconds,
rows_per_second,
ReadableSize(elem.read_bytes / elapsed_seconds));
}
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64
{
return reinterpret_cast<std::uintptr_t>(&proc);
};
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
/// NOTE: convert this to UInt64
processor_elem.elapsed_us = static_cast<UInt32>(processor->getElapsedUs());
processor_elem.input_wait_elapsed_us = static_cast<UInt32>(processor->getInputWaitElapsedUs());
processor_elem.output_wait_elapsed_us = static_cast<UInt32>(processor->getOutputWaitElapsedUs());
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
logQueryFinish(elem, context, ast, query_pipeline, pulling_pipeline, query_span, internal);
if (*implicit_txn_control)
execute_implicit_tcl_query(context, ASTTransactionControl::COMMIT);
}
if (query_span)
{
query_span->addAttribute("db.statement", elem.query);
query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id);
query_span->addAttribute("clickhouse.query_status", "QueryFinish");
query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate);
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);
query_span->addAttributeIfNotZero("clickhouse.written_rows", elem.written_rows);
query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes);
query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage);
query_span->finish();
}
};
auto exception_callback = [start_watch,
elem,
context,
ast,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
my_quota(quota),
status_info_to_query_log,
implicit_txn_control,
execute_implicit_tcl_query,
query_span](bool log_error) mutable
auto exception_callback =
[start_watch, elem, context, ast, internal, my_quota(quota), implicit_txn_control, execute_implicit_tcl_query, query_span](
bool log_error) mutable
{
if (*implicit_txn_control)
execute_implicit_tcl_query(context, ASTTransactionControl::ROLLBACK);
@ -1080,60 +1173,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (my_quota)
my_quota->used(QuotaType::ERRORS, 1, /* check_exceeded = */ false);
elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING;
elem.exception_code = getCurrentExceptionCode();
auto exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
elem.exception = std::move(exception_message.text);
elem.exception_format_string = exception_message.format_string;
QueryStatusPtr process_list_elem = context->getProcessListElement();
const Settings & current_settings = context->getSettingsRef();
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
const auto time_now = std::chrono::system_clock::now();
elem.event_time = timeInSeconds(time_now);
elem.event_time_microseconds = timeInMicroseconds(time_now);
if (process_list_elem)
{
QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false);
status_info_to_query_log(elem, info, ast, context);
}
else
{
elem.query_duration_ms = start_watch.elapsedMilliseconds();
}
if (current_settings.calculate_text_stack_trace && log_error)
setExceptionStackTrace(elem);
logException(context, elem, log_error);
/// In case of exception we log internal queries also
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
ProfileEvents::increment(ProfileEvents::FailedQuery);
if (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>())
{
ProfileEvents::increment(ProfileEvents::FailedSelectQuery);
}
else if (ast->as<ASTInsertQuery>())
{
ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
}
if (query_span)
{
query_span->addAttribute("db.statement", elem.query);
query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id);
query_span->addAttribute("clickhouse.exception", elem.exception);
query_span->addAttribute("clickhouse.exception_code", elem.exception_code);
query_span->finish();
}
logQueryException(elem, context, start_watch, ast, query_span, internal, log_error);
};
res.finish_callback = std::move(finish_callback);
@ -1148,7 +1188,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
txn->onException();
if (!internal)
onExceptionBeforeStart(query_for_logging, context, ast, query_span, start_watch.elapsedMilliseconds());
logExceptionBeforeStart(query_for_logging, context, ast, query_span, start_watch.elapsedMilliseconds());
throw;
}

View File

@ -1,15 +1,21 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/Context_fwd.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/QueryLog.h>
#include <QueryPipeline/BlockIO.h>
#include <memory>
#include <optional>
namespace DB
{
class IInterpreter;
class ReadBuffer;
class WriteBuffer;
struct QueryStatusInfo;
struct QueryResultDetails
{
@ -66,4 +72,41 @@ BlockIO executeQuery(
/// if built pipeline does not require any input and does not produce any output.
void executeTrivialBlockIO(BlockIO & streams, ContextPtr context);
/// Prepares a QueryLogElement and, if enabled, logs it to system.query_log
QueryLogElement logQueryStart(
const std::chrono::time_point<std::chrono::system_clock> & query_start_time,
const ContextMutablePtr & context,
const String & query_for_logging,
const ASTPtr & query_ast,
const QueryPipeline & pipeline,
const std::unique_ptr<IInterpreter> & interpreter,
bool internal,
const String & query_database,
const String & query_table,
bool async_insert);
void logQueryFinish(
QueryLogElement & elem,
const ContextMutablePtr & context,
const ASTPtr & query_ast,
const QueryPipeline & query_pipeline,
bool pulling_pipeline,
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
bool internal);
void logQueryException(
QueryLogElement & elem,
const ContextMutablePtr & context,
const Stopwatch & start_watch,
const ASTPtr & query_ast,
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
bool internal,
bool log_error);
void logExceptionBeforeStart(
const String & query_for_logging,
ContextPtr context,
ASTPtr ast,
const std::shared_ptr<OpenTelemetry::SpanHolder> & query_span,
UInt64 elapsed_millliseconds);
}

View File

@ -35,6 +35,8 @@ public:
/// Data from buffer to insert after inlined one - may be nullptr.
ReadBuffer * tail = nullptr;
bool async_insert_flush = false;
String getDatabase() const;
String getTable() const;
@ -66,7 +68,7 @@ public:
return res;
}
QueryKind getQueryKind() const override { return QueryKind::Insert; }
QueryKind getQueryKind() const override { return async_insert_flush ? QueryKind::AsyncInsertFlush : QueryKind::Insert; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -305,6 +305,7 @@ public:
Commit,
Rollback,
SetTransactionSnapshot,
AsyncInsertFlush
};
/// Return QueryKind of this AST query.
virtual QueryKind getQueryKind() const { return QueryKind::None; }

View File

@ -810,7 +810,6 @@ class ClickhouseIntegrationTestsRunner:
result_state = "failure"
if not should_fail:
break
assert should_fail
logging.info("Try is OK, all tests passed, going to clear env")
clear_ip_tables_and_restart_daemons()
logging.info("And going to sleep for some time")

View File

@ -818,9 +818,10 @@ def test_start_stop_moves(start_cluster, name, engine):
node1.query(f"SYSTEM STOP MOVES {name}")
node1.query(f"SYSTEM STOP MERGES {name}")
first_part = None
for i in range(5):
data = [] # 5MB in total
for i in range(5):
for _ in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
# jbod size is 40MB, so lets insert 5MB batch 7 times
node1.query_with_retry(
@ -829,8 +830,14 @@ def test_start_stop_moves(start_cluster, name, engine):
)
)
# we cannot rely simply on modification time of part because it can be changed
# by different background operations so we explicitly check after the first
# part is inserted
if i == 0:
first_part = get_oldest_part(node1, name)
assert first_part is not None
used_disks = get_used_disks_for_table(node1, name)
retry = 5

View File

@ -18,7 +18,7 @@ select distinct a from distinct_in_order settings max_block_size=10, max_threads
select '-- create table with not only primary key columns';
drop table if exists distinct_in_order sync;
create table distinct_in_order (a int, b int, c int) engine=MergeTree() order by (a, b);
create table distinct_in_order (a int, b int, c int) engine=MergeTree() order by (a, b) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into distinct_in_order select number % number, number % 5, number % 10 from numbers(1,1000000);
select '-- distinct with primary key prefix only';
@ -59,16 +59,16 @@ drop table if exists distinct_in_order sync;
select '-- check that distinct in order returns the same result as ordinary distinct';
drop table if exists distinct_cardinality_low sync;
CREATE TABLE distinct_cardinality_low (low UInt64, medium UInt64, high UInt64) ENGINE MergeTree() ORDER BY (low, medium);
CREATE TABLE distinct_cardinality_low (low UInt64, medium UInt64, high UInt64) ENGINE MergeTree() ORDER BY (low, medium) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
INSERT INTO distinct_cardinality_low SELECT number % 1e1, number % 1e2, number % 1e3 FROM numbers_mt(1e4);
drop table if exists distinct_in_order sync;
drop table if exists ordinary_distinct sync;
select '-- check that distinct in order WITH order by returns the same result as ordinary distinct';
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into distinct_in_order select distinct * from distinct_cardinality_low order by high settings optimize_distinct_in_order=1;
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into ordinary_distinct select distinct * from distinct_cardinality_low order by high settings optimize_distinct_in_order=0;
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
@ -76,9 +76,9 @@ drop table if exists distinct_in_order sync;
drop table if exists ordinary_distinct sync;
select '-- check that distinct in order WITHOUT order by returns the same result as ordinary distinct';
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into distinct_in_order select distinct * from distinct_cardinality_low settings optimize_distinct_in_order=1;
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into ordinary_distinct select distinct * from distinct_cardinality_low settings optimize_distinct_in_order=0;
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
@ -86,9 +86,9 @@ drop table if exists distinct_in_order;
drop table if exists ordinary_distinct;
select '-- check that distinct in order WITHOUT order by and WITH filter returns the same result as ordinary distinct';
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into distinct_in_order select distinct * from distinct_cardinality_low where low > 0 settings optimize_distinct_in_order=1;
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into ordinary_distinct select distinct * from distinct_cardinality_low where low > 0 settings optimize_distinct_in_order=0;
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
@ -102,12 +102,12 @@ drop table if exists sorting_key_contain_function;
select '-- bug 42185, distinct in order and empty sort description';
select '-- distinct in order, sorting key tuple()';
create table sorting_key_empty_tuple (a int, b int) engine=MergeTree() order by tuple();
create table sorting_key_empty_tuple (a int, b int) engine=MergeTree() order by tuple() SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into sorting_key_empty_tuple select number % 2, number % 5 from numbers(1,10);
select distinct a from sorting_key_empty_tuple;
select '-- distinct in order, sorting key contains function';
create table sorting_key_contain_function (datetime DateTime, a int) engine=MergeTree() order by (toDate(datetime));
create table sorting_key_contain_function (datetime DateTime, a int) engine=MergeTree() order by (toDate(datetime)) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
insert into sorting_key_contain_function values ('2000-01-01', 1);
insert into sorting_key_contain_function values ('2000-01-01', 2);
select distinct datetime from sorting_key_contain_function;

View File

@ -364,6 +364,8 @@ in
inIgnoreSet
indexHint
indexOf
initcap
initcapUTF8
initialQueryID
initializeAggregation
intDiv

View File

@ -0,0 +1,135 @@
system.query_log
Row 1:
──────
type: QueryStart
read_rows: 0
read_bytes: 0
written_rows: 0
written_bytes: 0
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing']
columns: []
views: []
exception_code: 0
Row 2:
──────
type: QueryFinish
read_rows: 0
read_bytes: 0
written_rows: 4
written_bytes: 16
result_rows: 4
result_bytes: 16
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing']
columns: []
views: []
exception_code: 0
system.query_views_log
system.query_log
Row 1:
──────
type: QueryStart
read_rows: 0
read_bytes: 0
written_rows: 0
written_bytes: 0
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
columns: []
views: ['default.async_insert_mv']
exception_code: 0
Row 2:
──────
type: QueryFinish
read_rows: 3
read_bytes: 12
written_rows: 6
written_bytes: 12
result_rows: 6
result_bytes: 12
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
columns: ['default.async_insert_landing.id']
views: ['default.async_insert_mv']
exception_code: 0
system.query_views_log
Row 1:
──────
view_name: default.async_insert_mv
view_type: Materialized
view_query: SELECT id + throwIf(id = 42) FROM default.async_insert_landing
view_target: default.async_insert_target
read_rows: 3
read_bytes: 12
written_rows: 3
written_bytes: 0
status: QueryFinish
exception_code: 0
system.query_log
Row 1:
──────
type: QueryStart
read_rows: 0
read_bytes: 0
written_rows: 0
written_bytes: 0
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
columns: []
views: ['default.async_insert_mv']
exception_code: 0
Row 2:
──────
type: Exc*****onWhileProcessing
read_rows: 3
read_bytes: 12
written_rows: 3
written_bytes: 12
result_rows: 0
result_bytes: 0
query: INSERT INTO default.async_insert_landing SETTINGS wait_for_async_insert = 1, async_insert = 1 FORMAT Values
query_kind: AsyncInsertFlush
databases: ['default']
tables: ['default.async_insert_landing','default.async_insert_target']
columns: ['default.async_insert_landing.id']
views: ['default.async_insert_mv']
exception_code: 395
system.query_views_log
Row 1:
──────
view_name: default.async_insert_mv
view_type: Materialized
view_query: SELECT id + throwIf(id = 42) FROM default.async_insert_landing
view_target: default.async_insert_target
read_rows: 3
read_bytes: 12
written_rows: 0
written_bytes: 0
status: Exc*****onWhileProcessing
exception_code: 395

View File

@ -0,0 +1,75 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
function print_flush_query_logs()
{
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo ""
echo "system.query_log"
${CLICKHOUSE_CLIENT} -q "
SELECT
replace(type::String, 'Exception', 'Exc*****on') as type,
read_rows,
read_bytes,
written_rows,
written_bytes,
result_rows,
result_bytes,
query,
query_kind,
databases,
tables,
columns,
views,
exception_code
FROM system.query_log
WHERE
event_date >= yesterday()
AND initial_query_id = (SELECT flush_query_id FROM system.asynchronous_insert_log WHERE query_id = '$1')
-- AND current_database = currentDatabase() -- Just to silence style check: this is not ok for this test since the query uses default values
ORDER BY type DESC
FORMAT Vertical"
echo ""
echo "system.query_views_log"
${CLICKHOUSE_CLIENT} -q "
SELECT
view_name,
view_type,
view_query,
view_target,
read_rows,
read_bytes,
written_rows,
written_bytes,
replace(status::String, 'Exception', 'Exc*****on') as status,
exception_code
FROM system.query_views_log
WHERE
event_date >= yesterday()
AND initial_query_id = (SELECT flush_query_id FROM system.asynchronous_insert_log WHERE query_id = '$1')
FORMAT Vertical"
}
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_insert_landing (id UInt32) ENGINE = MergeTree ORDER BY id"
query_id="$(random_str 10)"
${CLICKHOUSE_CLIENT} --query_id="${query_id}" -q "INSERT INTO async_insert_landing SETTINGS wait_for_async_insert=1, async_insert=1 values (1), (2), (3), (4);"
print_flush_query_logs ${query_id}
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_insert_target (id UInt32) ENGINE = MergeTree ORDER BY id"
${CLICKHOUSE_CLIENT} -q "CREATE MATERIALIZED VIEW async_insert_mv TO async_insert_target AS SELECT id + throwIf(id = 42) FROM async_insert_landing"
query_id="$(random_str 10)"
${CLICKHOUSE_CLIENT} --query_id="${query_id}" -q "INSERT INTO async_insert_landing SETTINGS wait_for_async_insert=1, async_insert=1 values (11), (12), (13);"
print_flush_query_logs ${query_id}
query_id="$(random_str 10)"
${CLICKHOUSE_CLIENT} --query_id="${query_id}" -q "INSERT INTO async_insert_landing SETTINGS wait_for_async_insert=1, async_insert=1 values (42), (12), (13)" 2>/dev/null || true
print_flush_query_logs ${query_id}

View File

@ -0,0 +1,13 @@
Hello
Hello
Hello World
Yeah, Well, I`M Gonna Go Build My Own Theme Park
Crc32ieee Is The Best Function
42ok
Hello
Yeah, Well, I`M Gonna Go Build My Own Theme Park
Привет, Как Дела?
Ätsch, Bätsch
We Dont Support Cases When Lowercase And Uppercase Characters Occupy Different Number Of Bytes In Utf-8. As An Example, This Happens For ß And ẞ.

View File

@ -0,0 +1,14 @@
select initcap('');
select initcap('Hello');
select initcap('hello');
select initcap('hello world');
select initcap('yeah, well, i`m gonna go build my own theme park');
select initcap('CRC32IEEE is the best function');
select initcap('42oK');
select initcapUTF8('');
select initcapUTF8('Hello');
select initcapUTF8('yeah, well, i`m gonna go build my own theme park');
select initcapUTF8('привет, как дела?');
select initcapUTF8('ätsch, bätsch');
select initcapUTF8('We dont support cases when lowercase and uppercase characters occupy different number of bytes in UTF-8. As an example, this happens for ß and ẞ.');

View File

@ -0,0 +1,9 @@
drop table if exists test;
create table test
(
n1 UInt32,
n2 UInt32 alias murmurHash3_32(n1),
n3 UInt32 materialized n2 + 1
)engine=MergeTree order by n1;
insert into test select * from generateRandom() limit 10;
drop table test;

View File

@ -1582,6 +1582,8 @@ indexOf
infi
initialQueryID
initializeAggregation
initcap
initcapUTF
injective
innogames
inodes