This commit is contained in:
Alexander Kuzmenkov 2020-11-10 08:50:32 +03:00
parent 26229ed231
commit 0530c40cd8
6 changed files with 93 additions and 169 deletions

View File

@ -110,7 +110,6 @@ public:
__uint128_t opentelemetry_trace_id;
UInt64 opentelemetry_current_span_id;
std::unique_ptr<OpenTelemetrySpanHolder> opentelemetry_thread_span;
protected:
ThreadGroupStatusPtr thread_group;

View File

@ -79,6 +79,7 @@ FormatSettings getFormatSettings(const Context & context,
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.json.named_tuple_as_object = settings.output_format_json_named_tuple_as_object;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.null_as_default = settings.input_format_null_as_default;

View File

@ -40,7 +40,16 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(finish_time_us);
columns[i++]->insert(DateLUT::instance().toDayNum(finish_time_us / 1000000));
columns[i++]->insert(attribute_names);
columns[i++]->insert(attribute_values);
// The user might add some ints values, and we will have Int Field, and the
// insert will fail because the column requires Strings. Convert the fields
// here, because it's hard to remember to convert them in all other places.
Array string_values;
string_values.reserve(attribute_values.size());
for (auto & value : attribute_values)
{
string_values.push_back(toString(value));
}
columns[i++]->insert(string_values);
}
OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name)
@ -59,8 +68,8 @@ OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_
start_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
// *** remove this
attribute_names.push_back("start.stacktrace");
// ****** remove this
attribute_names.push_back("clickhouse.start.stacktrace");
attribute_values.push_back(StackTrace().toString());
thread.opentelemetry_current_span_id = span_id;
@ -70,8 +79,6 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
{
try
{
fmt::print(stderr, "{}\n", StackTrace().toString());
if (!trace_id)
{
return;
@ -90,8 +97,6 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
return;
}
fmt::print(stderr, "1\n");
auto * context = thread_group->query_context;
if (!context)
{
@ -104,8 +109,8 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
//******** remove this
attribute_names.push_back("clickhouse.query_id");
attribute_values.push_back(context->getCurrentQueryId());
fmt::print(stderr, "2\n");
attribute_names.push_back("clickhouse.end.stacktrace");
attribute_values.push_back(StackTrace().toString());
auto log = context->getOpenTelemetrySpanLog();
if (!log)
@ -114,8 +119,6 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
return;
}
fmt::print(stderr, "3\n");
finish_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -126,8 +129,6 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
log->add(OpenTelemetrySpanLogElement(
static_cast<OpenTelemetrySpan>(*this)));
fmt::print(stderr, "4\n");
}
catch (...)
{

View File

@ -113,14 +113,13 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
applyQuerySettings();
opentelemetry_trace_id = query_context->getClientInfo().opentelemetry_trace_id;
opentelemetry_current_span_id = query_context->getClientInfo().opentelemetry_span_id;
if (opentelemetry_trace_id)
{
// Register the span for our thread. We might not know the name yet
// -- there are no strong constraints on when it is set relative to
// attaching the thread to query. Will set the name when the span ends.
opentelemetry_thread_span.reset(new OpenTelemetrySpanHolder(""));
opentelemetry_current_span_id = thread_local_rng();
}
else
{
opentelemetry_current_span_id = 0;
}
}
else
@ -319,11 +318,38 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
if (opentelemetry_thread_span)
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
if (opentelemetry_trace_id && query_context)
{
opentelemetry_thread_span->operation_name = getThreadName();
opentelemetry_thread_span->attribute_names.push_back("clickhouse.thread_id");
opentelemetry_thread_span->attribute_values.push_back(thread_id);
opentelemetry_span_log = query_context->getOpenTelemetrySpanLog();
}
if (opentelemetry_span_log)
{
// Log the current thread span.
// We do this manually, because we can't use OpenTelemetrySpanHolder as a
// ThreadStatus member, because of linking issues. This file is linked
// separately, so we can reference OpenTelemetrySpanLog here, but if we had
// the span holder as a field, we would have to reference it in the
// destructor, which is in another library.
OpenTelemetrySpanLogElement span;
span.trace_id = opentelemetry_trace_id;
// Might be problematic if some span holder isn't finished by the time
// we detach this thread...
span.span_id = opentelemetry_current_span_id;
span.parent_span_id = query_context->getClientInfo().opentelemetry_span_id;
span.operation_name = getThreadName();
span.start_time_us = query_start_time_microseconds;
span.finish_time_us =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
// We could use a more precise and monotonic counter for this.
span.duration_ns = (span.finish_time_us - span.start_time_us) * 1000;
span.attribute_names.push_back("clickhouse.thread_id");
span.attribute_values.push_back(thread_id);
opentelemetry_span_log->add(span);
}
finalizeQueryProfiler();
@ -338,7 +364,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_id.clear();
query_context = nullptr;
opentelemetry_thread_span = nullptr;
opentelemetry_trace_id = 0;
opentelemetry_current_span_id = 0;
thread_group.reset();

View File

@ -1,50 +1,12 @@
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void JSONRowOutputFormat::addColumn(String name, DataTypePtr type,
bool & need_validate_utf8, std::string tabs)
{
if (!type->textCanContainOnlyValidUTF8())
need_validate_utf8 = true;
WriteBufferFromOwnString buf;
writeJSONString(name, buf, settings);
const auto * as_tuple = typeid_cast<const DataTypeTuple *>(type.get());
const bool recurse = settings.json.named_tuple_as_object
&& as_tuple && as_tuple->haveExplicitNames();
fields.emplace_back(FieldInfo{buf.str(), type, recurse, tabs});
if (recurse)
{
const auto & element_types = as_tuple->getElements();
const auto & names = as_tuple->getElementNames();
assert(element_types.size() == names.size());
for (size_t i = 0; i < element_types.size(); i++)
{
addColumn(names[i], element_types[i], need_validate_utf8, tabs + "\t");
}
}
}
JSONRowOutputFormat::JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
@ -55,21 +17,19 @@ JSONRowOutputFormat::JSONRowOutputFormat(
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
fields.assign(columns.begin(), columns.end());
fields.reserve(columns.size());
const std::string initial_tabs = settings.json.write_metadata ? "\t\t\t" : "\t\t";
bool need_validate_utf8 = false;
for (const auto & column : columns)
for (size_t i = 0; i < sample.columns(); ++i)
{
addColumn(column.name, column.type, need_validate_utf8, initial_tabs);
}
if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8())
need_validate_utf8 = true;
// for (size_t i = 0; i < fields.size(); i++)
// {
// fmt::print(stderr, "{}: '{}' '{}' '{}\n",
// i, fields[i].name, fields[i].type->getName(), fields[i].recurse);
// }
WriteBufferFromOwnString buf;
writeJSONString(fields[i].name, buf, settings);
fields[i].name = buf.str();
}
if (need_validate_utf8)
{
@ -83,76 +43,40 @@ JSONRowOutputFormat::JSONRowOutputFormat(
void JSONRowOutputFormat::writePrefix()
{
if (settings.json.write_metadata)
writeCString("{\n", *ostr);
writeCString("\t\"meta\":\n", *ostr);
writeCString("\t[\n", *ostr);
for (size_t i = 0; i < fields.size(); ++i)
{
writeCString("{\n", *ostr);
writeCString("\t\"meta\":\n", *ostr);
writeCString("\t[\n", *ostr);
writeCString("\t\t{\n", *ostr);
for (size_t i = 0; i < fields.size(); ++i)
{
writeCString("\t\t{\n", *ostr);
writeCString("\t\t\t\"name\": ", *ostr);
writeString(fields[i].name, *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\t\"type\": ", *ostr);
writeJSONString(fields[i].type->getName(), *ostr, settings);
writeChar('\n', *ostr);
writeCString("\t\t}", *ostr);
if (i + 1 < fields.size())
writeChar(',', *ostr);
writeChar('\n', *ostr);
}
writeCString("\t],\n", *ostr);
writeCString("\t\t\t\"name\": ", *ostr);
writeString(fields[i].name, *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\t\"type\": ", *ostr);
writeJSONString(fields[i].type->getName(), *ostr, settings);
writeChar('\n', *ostr);
writeCString("\t\t}", *ostr);
if (i + 1 < fields.size())
writeChar(',', *ostr);
writeChar('\n', *ostr);
writeCString("\t\"data\":\n", *ostr);
writeCString("\t", *ostr);
}
writeCString("[\n", *ostr);
writeCString("\t],\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"data\":\n", *ostr);
writeCString("\t[\n", *ostr);
}
void JSONRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
// fmt::print(stderr, "write field column '{}' type '{}'\n",
// column.getName(), type.getName());
writeString(fields[field_number].tabs, *ostr);
writeCString("\t\t\t", *ostr);
writeString(fields[field_number].name, *ostr);
writeCString(": ", *ostr);
// Sanity check: the input column type is the same as in header block.
// If I don't write out the raw pointer explicitly, for some reason clang
// complains about side effect in dereferencing the pointer:
// src/Processors/Formats/Impl/JSONRowOutputFormat.cpp:120:35: warning: expression with side effects will be evaluated despite being used as an operand to 'typeid' [-Wpotentially-evaluated-expression]
[[maybe_unused]] const IDataType * raw_ptr = fields[field_number].type.get();
assert(typeid(type) == typeid(*raw_ptr));
if (fields[field_number].recurse)
{
const auto & tabs = fields[field_number].tabs;
++field_number;
const auto & tuple_column = assert_cast<const ColumnTuple &>(column);
const auto & nested_columns = tuple_column.getColumns();
writeCString("{\n", *ostr);
for (size_t i = 0; i < nested_columns.size(); i++)
{
// field_number is incremented inside, and should match the nested
// columns.
writeField(*nested_columns[i], *fields[field_number].type, row_num);
if (i + 1 < nested_columns.size())
{
writeCString(",", *ostr);
}
writeCString("\n", *ostr);
}
writeString(tabs, *ostr);
writeCString("}", *ostr);
return;
}
if (yield_strings)
{
WriteBufferFromOwnString buf;
@ -220,12 +144,6 @@ void JSONRowOutputFormat::writeSuffix()
void JSONRowOutputFormat::writeBeforeTotals()
{
if (!settings.json.write_metadata)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot output totals in JSON format without metadata");
}
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"totals\":\n", *ostr);
@ -254,12 +172,6 @@ void JSONRowOutputFormat::writeAfterTotals()
void JSONRowOutputFormat::writeBeforeExtremes()
{
if (!settings.json.write_metadata)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot output extremes in JSON format without metadata");
}
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"extremes\":\n", *ostr);
@ -305,20 +217,17 @@ void JSONRowOutputFormat::writeAfterExtremes()
void JSONRowOutputFormat::writeLastSuffix()
{
if (settings.json.write_metadata)
{
writeCString(",\n\n", *ostr);
writeCString("\t\"rows\": ", *ostr);
writeIntText(row_count, *ostr);
writeCString(",\n\n", *ostr);
writeCString("\t\"rows\": ", *ostr);
writeIntText(row_count, *ostr);
writeRowsBeforeLimitAtLeast();
writeRowsBeforeLimitAtLeast();
if (settings.write_statistics)
writeStatistics();
if (settings.write_statistics)
writeStatistics();
writeChar('\n', *ostr);
writeCString("}\n", *ostr);
}
writeChar('\n', *ostr);
writeCString("}\n", *ostr);
ostr->next();
}

View File

@ -70,8 +70,6 @@ protected:
void writeRowsBeforeLimitAtLeast();
void writeStatistics();
void addColumn(String name, DataTypePtr type, bool & need_validate_utf8,
std::string tabs);
std::unique_ptr<WriteBuffer> validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character.
WriteBuffer * ostr;
@ -80,16 +78,7 @@ protected:
size_t row_count = 0;
bool applied_limit = false;
size_t rows_before_limit = 0;
struct FieldInfo
{
String name;
DataTypePtr type;
bool recurse = false;
std::string tabs;
};
std::vector<FieldInfo> fields;
NamesAndTypes fields;
Progress progress;
Stopwatch watch;