Support prometheus remote write protocol.

This commit is contained in:
Vitaly Baranov 2024-05-27 17:19:36 +02:00
parent 7d88995f42
commit 24103c733d
19 changed files with 973 additions and 3 deletions

View File

@ -78,6 +78,7 @@ add_headers_and_sources(clickhouse_common_io Common/Scheduler)
add_headers_and_sources(clickhouse_common_io Common/Scheduler/Nodes)
add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/Archives)
add_headers_and_sources(clickhouse_common_io IO/Protobuf)
add_headers_and_sources(clickhouse_common_io IO/S3)
add_headers_and_sources(clickhouse_common_io IO/AzureBlobStorage)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
@ -470,6 +471,7 @@ dbms_target_link_libraries (PUBLIC ch_contrib::sparsehash)
if (TARGET ch_contrib::protobuf)
dbms_target_link_libraries (PRIVATE ch_contrib::protobuf)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::protobuf)
endif ()
if (TARGET clickhouse_grpc_protos)

View File

@ -604,6 +604,10 @@
M(723, PARQUET_EXCEPTION) \
M(724, TOO_MANY_TABLES) \
M(725, TOO_MANY_DATABASES) \
M(726, UNEXPECTED_HTTP_HEADERS) \
M(727, UNEXPECTED_TABLE_ENGINE) \
M(728, UNEXPECTED_DATA_TYPE) \
M(729, ILLEGAL_TIME_SERIES_TAGS) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -0,0 +1,56 @@
#include "config.h"
#if USE_PROTOBUF
#include <IO/Protobuf/ProtobufZeroCopyInputStreamFromReadBuffer.h>
#include <IO/ReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ProtobufZeroCopyInputStreamFromReadBuffer::ProtobufZeroCopyInputStreamFromReadBuffer(std::unique_ptr<ReadBuffer> in_) : in(std::move(in_))
{
}
ProtobufZeroCopyInputStreamFromReadBuffer::~ProtobufZeroCopyInputStreamFromReadBuffer() = default;
bool ProtobufZeroCopyInputStreamFromReadBuffer::Next(const void ** data, int * size)
{
if (in->eof())
return false;
*data = in->position();
*size = static_cast<int>(in->available());
in->position() += *size;
return true;
}
void ProtobufZeroCopyInputStreamFromReadBuffer::BackUp(int count)
{
if (static_cast<Int64>(in->offset()) < count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"ProtobufZeroCopyInputStreamFromReadBuffer::BackUp() cannot back up {} bytes (max = {} bytes)",
count,
in->offset());
in->position() -= count;
}
bool ProtobufZeroCopyInputStreamFromReadBuffer::Skip(int count)
{
return static_cast<Int64>(in->tryIgnore(count)) == count;
}
int64_t ProtobufZeroCopyInputStreamFromReadBuffer::ByteCount() const
{
return in->count();
}
}
#endif

View File

@ -0,0 +1,38 @@
#pragma once
#include "config.h"
#if USE_PROTOBUF
#include <google/protobuf/io/zero_copy_stream.h>
namespace DB
{
class ReadBuffer;
class ProtobufZeroCopyInputStreamFromReadBuffer : public google::protobuf::io::ZeroCopyInputStream
{
public:
explicit ProtobufZeroCopyInputStreamFromReadBuffer(std::unique_ptr<ReadBuffer> in_);
~ProtobufZeroCopyInputStreamFromReadBuffer() override;
// Obtains a chunk of data from the stream.
bool Next(const void ** data, int * size) override;
// Backs up a number of bytes, so that the next call to Next() returns
// data again that was already returned by the last call to Next().
void BackUp(int count) override;
// Skips a number of bytes.
bool Skip(int count) override;
// Returns the total number of bytes read since this object was created.
int64_t ByteCount() const override;
private:
std::unique_ptr<ReadBuffer> in;
};
}
#endif

View File

@ -254,6 +254,8 @@ String toString(ClientInfo::Interface interface)
return "LOCAL";
case ClientInfo::Interface::TCP_INTERSERVER:
return "TCP_INTERSERVER";
case ClientInfo::Interface::PROMETHEUS:
return "PROMETHEUS";
}
return std::format("Unknown server interface ({}).", static_cast<int>(interface));

View File

@ -38,6 +38,7 @@ public:
POSTGRESQL = 5,
LOCAL = 6,
TCP_INTERSERVER = 7,
PROMETHEUS = 8,
};
enum class HTTPMethod : uint8_t

View File

@ -105,9 +105,10 @@ ColumnsDescription SessionLogElement::getColumnsDescription()
{"MySQL", static_cast<Int8>(Interface::MYSQL)},
{"PostgreSQL", static_cast<Int8>(Interface::POSTGRESQL)},
{"Local", static_cast<Int8>(Interface::LOCAL)},
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)}
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)},
{"Prometheus", static_cast<Int8>(Interface::PROMETHEUS)},
});
static_assert(magic_enum::enum_count<Interface>() == 7);
static_assert(magic_enum::enum_count<Interface>() == 8);
auto lc_string_datatype = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());

View File

@ -0,0 +1,22 @@
#include <Server/HTTP/checkHTTPHeader.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_HTTP_HEADERS;
}
void checkHTTPHeader(const HTTPRequest & request, const String & header_name, const String & expected_value)
{
if (!request.has(header_name))
throw Exception(ErrorCodes::UNEXPECTED_HTTP_HEADERS, "No HTTP header {}", header_name);
if (request.get(header_name) != expected_value)
throw Exception(ErrorCodes::UNEXPECTED_HTTP_HEADERS, "HTTP header {} has unexpected value '{}' (instead of '{}')", header_name, request.get(header_name), expected_value);
}
}

View File

@ -0,0 +1,13 @@
#pragma once
#include <Server/HTTP/HTTPRequest.h>
#include <base/types.h>
namespace DB
{
/// Checks that the HTTP request has a specified header with a specified value.
void checkHTTPHeader(const HTTPRequest & request, const String & header_name, const String & expected_value);
}

View File

@ -9,6 +9,19 @@
#include <Server/PrometheusMetricsWriter.h>
#include "config.h"
#include <Access/Credentials.h>
#include <Common/CurrentThread.h>
#include <IO/SnappyReadBuffer.h>
#include <IO/Protobuf/ProtobufZeroCopyInputStreamFromReadBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Session.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/authenticateUserByHTTP.h>
#include <Server/HTTP/checkHTTPHeader.h>
#include <Server/HTTP/setReadOnlyIfHTTPMethodIdempotent.h>
#include <Storages/TimeSeries/PrometheusRemoteWriteProtocol.h>
namespace DB
{
@ -74,6 +87,154 @@ public:
};
/// Base implementation of a protocol with Context and authentication.
class PrometheusRequestHandler::ImplWithContext : public Impl
{
public:
explicit ImplWithContext(PrometheusRequestHandler & parent) : Impl(parent), default_settings(parent.server.context()->getSettingsRef()) { }
virtual void handlingRequestWithContext(HTTPServerRequest & request, HTTPServerResponse & response) = 0;
protected:
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override
{
SCOPE_EXIT({
request_credentials.reset();
context.reset();
session.reset();
params.reset();
});
params = std::make_unique<HTMLForm>(default_settings, request);
parent().send_stacktrace = config().is_stacktrace_enabled && params->getParsed<bool>("stacktrace", false);
if (!authenticateUserAndMakeContext(request, response))
return; /// The user is not authenticated yet, and the HTTP_UNAUTHORIZED response is sent with the "WWW-Authenticate" header,
/// and `request_credentials` must be preserved until the next request or until any exception.
/// Initialize query scope.
std::optional<CurrentThread::QueryScope> query_scope;
if (context)
query_scope.emplace(context);
handlingRequestWithContext(request, response);
}
bool authenticateUserAndMakeContext(HTTPServerRequest & request, HTTPServerResponse & response)
{
session = std::make_unique<Session>(server().context(), ClientInfo::Interface::PROMETHEUS, request.isSecure());
if (!authenticateUser(request, response))
return false;
makeContext(request);
return true;
}
bool authenticateUser(HTTPServerRequest & request, HTTPServerResponse & response)
{
return authenticateUserByHTTP(request, *params, response, *session, request_credentials, server().context(), log());
}
void makeContext(HTTPServerRequest & request)
{
context = session->makeQueryContext();
/// Anything else beside HTTP POST should be readonly queries.
setReadOnlyIfHTTPMethodIdempotent(context, request.getMethod());
auto roles = params->getAll("role");
if (!roles.empty())
context->setCurrentRoles(roles);
auto param_could_be_skipped = [&] (const String & name)
{
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
if (name.empty())
return true;
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
static const NameSet reserved_param_names{"user", "password", "quota_key", "stacktrace", "role", "query_id"};
return reserved_param_names.contains(name);
};
/// Settings can be overridden in the query.
SettingsChanges settings_changes;
for (const auto & [key, value] : *params)
{
if (!param_could_be_skipped(key))
{
/// Other than query parameters are treated as settings.
settings_changes.push_back({key, value});
}
}
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
context->applySettingsChanges(settings_changes);
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
context->setCurrentQueryId(params->get("query_id", request.get("X-ClickHouse-Query-Id", "")));
}
void onException() override
{
// So that the next requests on the connection have to always start afresh in case of exceptions.
request_credentials.reset();
}
const Settings & default_settings;
std::unique_ptr<HTMLForm> params;
std::unique_ptr<Session> session;
std::unique_ptr<Credentials> request_credentials;
ContextMutablePtr context;
};
/// Implementation of the remote-write protocol.
class PrometheusRequestHandler::RemoteWriteImpl : public ImplWithContext
{
public:
using ImplWithContext::ImplWithContext;
void beforeHandlingRequest(HTTPServerRequest & request) override
{
LOG_INFO(log(), "Handling remote write request from {}", request.get("User-Agent", ""));
chassert(config().type == PrometheusRequestHandlerConfig::Type::RemoteWrite);
}
void handlingRequestWithContext([[maybe_unused]] HTTPServerRequest & request, [[maybe_unused]] HTTPServerResponse & response) override
{
#if USE_PROMETHEUS_PROTOBUFS
checkHTTPHeader(request, "Content-Type", "application/x-protobuf");
checkHTTPHeader(request, "Content-Encoding", "snappy");
ProtobufZeroCopyInputStreamFromReadBuffer zero_copy_input_stream{
std::make_unique<SnappyReadBuffer>(wrapReadBufferReference(request.getStream()))};
prometheus::WriteRequest write_request;
if (!write_request.ParsePartialFromZeroCopyStream(&zero_copy_input_stream))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse WriteRequest");
auto table = DatabaseCatalog::instance().getTable(StorageID{config().time_series_table_name}, context);
PrometheusRemoteWriteProtocol protocol{table, context};
if (write_request.timeseries_size())
protocol.writeTimeSeries(write_request.timeseries());
if (write_request.metadata_size())
protocol.writeMetricsMetadata(write_request.metadata());
response.setContentType("text/plain; charset=UTF-8");
response.send();
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Prometheus remote write protocol is disabled");
#endif
}
};
PrometheusRequestHandler::PrometheusRequestHandler(
IServer & server_,
const PrometheusRequestHandlerConfig & config_,
@ -99,6 +260,11 @@ void PrometheusRequestHandler::createImpl()
impl = std::make_unique<ExposeMetricsImpl>(*this);
return;
}
case PrometheusRequestHandlerConfig::Type::RemoteWrite:
{
impl = std::make_unique<RemoteWriteImpl>(*this);
return;
}
}
UNREACHABLE();
}

View File

@ -11,7 +11,7 @@ class IServer;
class PrometheusMetricsWriter;
class WriteBufferFromHTTPServerResponse;
/// Handles requests for prometheus protocols (expose_metrics).
/// Handles requests for prometheus protocols (expose_metrics, remote_write).
class PrometheusRequestHandler : public HTTPRequestHandler
{
public:
@ -41,7 +41,9 @@ private:
const LoggerPtr log;
class Impl;
class ImplWithContext;
class ExposeMetricsImpl;
class RemoteWriteImpl;
std::unique_ptr<Impl> impl;
String http_method;

View File

@ -1,5 +1,7 @@
#pragma once
#include <Core/QualifiedTableName.h>
namespace DB
{
@ -11,6 +13,9 @@ struct PrometheusRequestHandlerConfig
{
/// Exposes ClickHouse metrics for scraping by Prometheus.
ExposeMetrics,
/// Handles Prometheus remote-write protocol.
RemoteWrite,
};
Type type = Type::ExposeMetrics;
@ -21,6 +26,9 @@ struct PrometheusRequestHandlerConfig
bool expose_events = false;
bool expose_errors = false;
/// Settings for types RemoteWrite, RemoteRead:
QualifiedTableName time_series_table_name;
size_t keep_alive_timeout = 0;
bool is_stacktrace_enabled = true;
};

View File

@ -45,18 +45,52 @@ namespace
return res;
}
/// Extracts a qualified table name from the config. It can be set either as
/// <table>mydb.prometheus</table>
/// or
/// <database>mydb</database>
/// <table>prometheus</table>
QualifiedTableName parseTableNameFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
QualifiedTableName res;
res.table = config.getString(config_prefix + ".table", "prometheus");
res.database = config.getString(config_prefix + ".database", "");
if (res.database.empty())
res = QualifiedTableName::parseFromString(res.table);
if (res.database.empty())
res.database = "default";
return res;
}
/// Parses a configuration like this:
/// <!-- <type>remote_write</type> (Implied, not actually parsed) -->
/// <table>db.time_series_table_name</table>
PrometheusRequestHandlerConfig parseRemoteWriteConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
PrometheusRequestHandlerConfig res;
res.type = PrometheusRequestHandlerConfig::Type::RemoteWrite;
res.time_series_table_name = parseTableNameFromConfig(config, config_prefix);
parseCommonConfig(config, res);
return res;
}
/// Parses a configuration like this:
/// <type>expose_metrics</type>
/// <metrics>true</metrics>
/// <asynchronous_metrics>true</asynchronous_metrics>
/// <events>true</events>
/// <errors>true</errors>
/// -OR-
/// <type>remote_write</type>
/// <table>db.time_series_table_name</table>
PrometheusRequestHandlerConfig parseHandlerConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
String type = config.getString(config_prefix + ".type");
if (type == "expose_metrics")
return parseExposeMetricsConfig(config, config_prefix);
else if (type == "remote_write")
return parseRemoteWriteConfig(config, config_prefix);
else
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown type {} is specified in the configuration for a prometheus protocol", type);
}

View File

@ -74,6 +74,13 @@ HTTPRequestHandlerFactoryPtr createPrometheusHandlerFactory(
/// <errors>true</errors>
/// </handler>
/// </my_rule_1>
/// <my_rule2>
/// <url>/write</url>
/// <handler>
/// <type>remote_write</type>
/// <table>db.time_series_table_name</table>
/// </handler>
/// </my_rule2>
/// </http_handlers>
HTTPRequestHandlerFactoryPtr createPrometheusHandlerFactoryForHTTPRule(
IServer & server,

View File

@ -25,6 +25,7 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int UNEXPECTED_TABLE_ENGINE;
}
@ -427,6 +428,29 @@ SinkToStoragePtr StorageTimeSeries::write(const ASTPtr & query, const StorageMet
}
std::shared_ptr<StorageTimeSeries> storagePtrToTimeSeries(StoragePtr storage)
{
if (auto res = typeid_cast<std::shared_ptr<StorageTimeSeries>>(storage))
return res;
throw Exception(
ErrorCodes::UNEXPECTED_TABLE_ENGINE,
"This operation can be executed on a TimeSeries table only, the engine of table {} is not TimeSeries",
storage->getStorageID().getNameForLogs());
}
std::shared_ptr<const StorageTimeSeries> storagePtrToTimeSeries(ConstStoragePtr storage)
{
if (auto res = typeid_cast<std::shared_ptr<const StorageTimeSeries>>(storage))
return res;
throw Exception(
ErrorCodes::UNEXPECTED_TABLE_ENGINE,
"This operation can be executed on a TimeSeries table only, the engine of table {} is not TimeSeries",
storage->getStorageID().getNameForLogs());
}
void registerStorageTimeSeries(StorageFactory & factory)
{
factory.registerStorage("TimeSeries", [](const StorageFactory::Arguments & args)

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/IStorage.h>
@ -104,4 +105,7 @@ private:
bool has_inner_tables;
};
std::shared_ptr<StorageTimeSeries> storagePtrToTimeSeries(StoragePtr storage);
std::shared_ptr<const StorageTimeSeries> storagePtrToTimeSeries(ConstStoragePtr storage);
}

View File

@ -0,0 +1,538 @@
#include <Storages/TimeSeries/PrometheusRemoteWriteProtocol.h>
#include "config.h"
#if USE_PROMETHEUS_PROTOBUFS
#include <Core/Field.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <Storages/StorageTimeSeries.h>
#include <Storages/TimeSeries/TimeSeriesColumnNames.h>
#include <Storages/TimeSeries/TimeSeriesColumnsValidator.h>
#include <Storages/TimeSeries/TimeSeriesTagNames.h>
#include <Storages/TimeSeries/TimeSeriesSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/addMissingDefaults.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sources/BlocksSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TIME_SERIES_TAGS;
extern const int ILLEGAL_COLUMN;
}
namespace
{
/// Checks that a specified set of labels is sorted and has no duplications, and there is one label named "__name__".
void checkLabels(const ::google::protobuf::RepeatedPtrField<::prometheus::Label> & labels)
{
bool metric_name_found = false;
for (size_t i = 0; i != static_cast<size_t>(labels.size()); ++i)
{
const auto & label = labels[static_cast<int>(i)];
const auto & label_name = label.name();
const auto & label_value = label.value();
if (label_name.empty())
throw Exception(ErrorCodes::ILLEGAL_TIME_SERIES_TAGS, "Label name should not be empty");
if (label_value.empty())
continue; /// Empty label value is treated like the label doesn't exist.
if (label_name == TimeSeriesTagNames::MetricName)
metric_name_found = true;
if (i)
{
/// Check that labels are sorted.
const auto & previous_label_name = labels[static_cast<int>(i - 1)].name();
if (label_name <= previous_label_name)
{
if (label_name == previous_label_name)
throw Exception(ErrorCodes::ILLEGAL_TIME_SERIES_TAGS, "Found duplicate label {}", label_name);
else
throw Exception(ErrorCodes::ILLEGAL_TIME_SERIES_TAGS, "Label names are not sorted in lexicographical order ({} > {})",
previous_label_name, label_name);
}
}
}
if (!metric_name_found)
throw Exception(ErrorCodes::ILLEGAL_TIME_SERIES_TAGS, "Metric name (label {}) not found", TimeSeriesTagNames::MetricName);
}
/// Finds the description of an insertable column in the list.
const ColumnDescription & getInsertableColumnDescription(const ColumnsDescription & columns, const String & column_name, const StorageID & time_series_storage_id)
{
const ColumnDescription * column = columns.tryGet(column_name);
if (!column || ((column->default_desc.kind != ColumnDefaultKind::Default) && (column->default_desc.kind != ColumnDefaultKind::Ephemeral)))
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{}: Column {} {}",
time_series_storage_id.getNameForLogs(), column_name, column ? "non-insertable" : "doesn't exist");
}
return *column;
}
/// Calculates the identifier of each time series in "tags_block" using the default expression for the "id" column,
/// and adds column "id" with the results to "tags_block".
IColumn & calculateId(const ContextPtr & context, const ColumnDescription & id_column_description, Block & tags_block)
{
auto blocks = std::make_shared<Blocks>();
blocks->push_back(tags_block);
auto header = tags_block.cloneEmpty();
auto pipe = Pipe(std::make_shared<BlocksSource>(blocks, header));
Block header_with_id;
const auto & id_name = id_column_description.name;
auto id_type = id_column_description.type;
header_with_id.insert(ColumnWithTypeAndName{id_type, id_name});
auto adding_missing_defaults_dag = addMissingDefaults(
pipe.getHeader(),
header_with_id.getNamesAndTypesList(),
ColumnsDescription{id_column_description},
context);
auto adding_missing_defaults_actions = std::make_shared<ExpressionActions>(adding_missing_defaults_dag);
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, adding_missing_defaults_actions);
});
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
header_with_id.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(
convert_actions_dag,
ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
QueryPipeline pipeline{std::move(pipe)};
PullingPipelineExecutor executor{pipeline};
MutableColumnPtr id_column;
Block block_from_executor;
while (executor.pull(block_from_executor))
{
if (block_from_executor)
{
MutableColumnPtr id_column_part = block_from_executor.getByName(id_name).column->assumeMutable();
if (id_column)
id_column->insertRangeFrom(*id_column_part, 0, id_column_part->size());
else
id_column = std::move(id_column_part);
}
}
if (!id_column)
id_column = id_type->createColumn();
IColumn & id_column_ref = *id_column;
tags_block.insert(0, ColumnWithTypeAndName{std::move(id_column), id_type, id_name});
return id_column_ref;
}
/// Converts a timestamp in milliseconds to a DateTime64 with a specified scale.
DateTime64 scaleTimestamp(Int64 timestamp_ms, UInt32 scale)
{
if (scale == 3)
return timestamp_ms;
else if (scale > 3)
return timestamp_ms * DecimalUtils::scaleMultiplier<DateTime64>(scale - 3);
else
return timestamp_ms / DecimalUtils::scaleMultiplier<DateTime64>(3 - scale);
}
struct BlocksToInsert
{
std::vector<std::pair<ViewTarget::Kind, Block>> blocks;
};
/// Converts time series from the protobuf format to prepared blocks for inserting into target tables.
BlocksToInsert toBlocks(const google::protobuf::RepeatedPtrField<prometheus::TimeSeries> & time_series,
const ContextPtr & context,
const StorageID & time_series_storage_id,
const StorageInMemoryMetadata & time_series_storage_metadata,
const TimeSeriesSettings & time_series_settings)
{
size_t num_tags_rows = time_series.size();
size_t num_data_rows = 0;
for (const auto & element : time_series)
num_data_rows += element.samples_size();
if (!num_data_rows)
return {}; /// Nothing to insert into target tables.
/// Column types must be extracted from the target tables' metadata.
const auto & columns_description = time_series_storage_metadata.columns;
auto get_column_description = [&](const String & column_name) -> const ColumnDescription &
{
return getInsertableColumnDescription(columns_description, column_name, time_series_storage_id);
};
/// We're going to prepare two blocks - one for the "data" table, and one for the "tags" table.
Block data_block, tags_block;
auto make_column_for_data_block = [&](const ColumnDescription & column_description) -> IColumn &
{
auto column = column_description.type->createColumn();
column->reserve(num_data_rows);
auto * column_ptr = column.get();
data_block.insert(ColumnWithTypeAndName{std::move(column), column_description.type, column_description.name});
return *column_ptr;
};
auto make_column_for_tags_block = [&](const ColumnDescription & column_description) -> IColumn &
{
auto column = column_description.type->createColumn();
column->reserve(num_tags_rows);
auto * column_ptr = column.get();
tags_block.insert(ColumnWithTypeAndName{std::move(column), column_description.type, column_description.name});
return *column_ptr;
};
/// Create columns.
/// Column "id".
const auto & id_description = get_column_description(TimeSeriesColumnNames::ID);
TimeSeriesColumnsValidator validator{time_series_storage_id, time_series_settings};
validator.validateColumnForID(id_description);
auto & id_column_in_data_table = make_column_for_data_block(id_description);
/// Column "timestamp".
const auto & timestamp_description = get_column_description(TimeSeriesColumnNames::Timestamp);
UInt32 timestamp_scale;
validator.validateColumnForTimestamp(timestamp_description, timestamp_scale);
auto & timestamp_column = make_column_for_data_block(timestamp_description);
/// Column "value".
const auto & value_description = get_column_description(TimeSeriesColumnNames::Value);
validator.validateColumnForValue(value_description);
auto & value_column = make_column_for_data_block(value_description);
/// Column "metric_name".
const auto & metric_name_description = get_column_description(TimeSeriesColumnNames::MetricName);
validator.validateColumnForMetricName(metric_name_description);
auto & metric_name_column = make_column_for_tags_block(metric_name_description);
/// Columns we should check explicitly that they're filled after filling each row.
std::vector<IColumn *> columns_to_fill_in_tags_table;
/// Columns corresponding to specific tags specified in the "tags_to_columns" setting.
std::unordered_map<String, IColumn *> columns_by_tag_name;
const Map & tags_to_columns = time_series_settings.tags_to_columns;
for (const auto & tag_name_and_column_name : tags_to_columns)
{
const auto & tuple = tag_name_and_column_name.safeGet<const Tuple &>();
const auto & tag_name = tuple.at(0).safeGet<String>();
const auto & column_name = tuple.at(1).safeGet<String>();
const auto & column_description = get_column_description(column_name);
validator.validateColumnForTagValue(column_description);
auto & column = make_column_for_tags_block(column_description);
columns_by_tag_name[tag_name] = &column;
columns_to_fill_in_tags_table.emplace_back(&column);
}
/// Column "tags".
const auto & tags_description = get_column_description(TimeSeriesColumnNames::Tags);
validator.validateColumnForTagsMap(tags_description);
auto & tags_column = typeid_cast<ColumnMap &>(make_column_for_tags_block(tags_description));
IColumn & tags_names = tags_column.getNestedData().getColumn(0);
IColumn & tags_values = tags_column.getNestedData().getColumn(1);
auto & tags_offsets = tags_column.getNestedColumn().getOffsets();
/// Column "all_tags".
const auto & all_tags_description = get_column_description(TimeSeriesColumnNames::AllTags);
validator.validateColumnForTagsMap(all_tags_description);
auto & all_tags_column = typeid_cast<ColumnMap &>(make_column_for_tags_block(all_tags_description));
IColumn & all_tags_names = all_tags_column.getNestedData().getColumn(0);
IColumn & all_tags_values = all_tags_column.getNestedData().getColumn(1);
auto & all_tags_offsets = all_tags_column.getNestedColumn().getOffsets();
/// Prepare a block for inserting into the "tags" table.
size_t current_row_in_tags = 0;
for (size_t i = 0; i != static_cast<size_t>(time_series.size()); ++i)
{
const auto & element = time_series[static_cast<int>(i)];
if (!element.samples_size())
continue;
const auto & labels = element.labels();
checkLabels(labels);
for (size_t j = 0; j != static_cast<size_t>(labels.size()); ++j)
{
const auto & label = labels[static_cast<int>(j)];
const auto & tag_name = label.name();
const auto & tag_value = label.value();
if (tag_name == TimeSeriesTagNames::MetricName)
{
metric_name_column.insertData(tag_value.data(), tag_value.length());
}
else
{
all_tags_names.insertData(tag_name.data(), tag_name.length());
all_tags_values.insertData(tag_value.data(), tag_value.length());
auto it = columns_by_tag_name.find(tag_name);
bool has_column_for_tag_value = (it != columns_by_tag_name.end());
if (has_column_for_tag_value)
{
auto * column = it->second;
column->insertData(tag_value.data(), tag_value.length());
}
else
{
tags_names.insertData(tag_name.data(), tag_name.length());
tags_values.insertData(tag_value.data(), tag_value.length());
}
}
}
all_tags_offsets.push_back(all_tags_names.size());
tags_offsets.push_back(tags_names.size());
for (auto * column : columns_to_fill_in_tags_table)
{
if (column->size() == current_row_in_tags)
column->insertDefault();
}
++current_row_in_tags;
}
/// Calculate an identifier for each time series, make a new column from those identifiers, and add it to "tags_block".
auto & id_column_in_tags_table = calculateId(context, columns_description.get(TimeSeriesColumnNames::ID), tags_block);
/// Prepare a block for inserting to the "data" table.
current_row_in_tags = 0;
for (size_t i = 0; i != static_cast<size_t>(time_series.size()); ++i)
{
const auto & element = time_series[static_cast<int>(i)];
if (!element.samples_size())
continue;
id_column_in_data_table.insertManyFrom(id_column_in_tags_table, current_row_in_tags, element.samples_size());
for (const auto & sample : element.samples())
{
timestamp_column.insert(scaleTimestamp(sample.timestamp(), timestamp_scale));
value_column.insert(sample.value());
}
++current_row_in_tags;
}
/// The "all_tags" column in the "tags" table is either ephemeral or doesn't exists.
/// We've used the "all_tags" column to calculate the "id" column already,
/// and now we don't need it to insert to the "tags" table.
tags_block.erase(TimeSeriesColumnNames::AllTags);
BlocksToInsert res;
/// A block to the "tags" table should be inserted first.
/// (Because any INSERT can fail and we don't want to have rows in the data table with no corresponding "id" written to the "tags" table.)
res.blocks.emplace_back(ViewTarget::Tags, std::move(tags_block));
res.blocks.emplace_back(ViewTarget::Data, std::move(data_block));
return res;
}
std::string_view metricTypeToString(prometheus::MetricMetadata::MetricType metric_type)
{
using namespace std::literals;
switch (metric_type)
{
case prometheus::MetricMetadata::UNKNOWN: return "unknown"sv;
case prometheus::MetricMetadata::COUNTER: return "counter"sv;
case prometheus::MetricMetadata::GAUGE: return "gauge"sv;
case prometheus::MetricMetadata::HISTOGRAM: return "histogram"sv;
case prometheus::MetricMetadata::GAUGEHISTOGRAM: return "gaugehistogram"sv;
case prometheus::MetricMetadata::SUMMARY: return "summary"sv;
case prometheus::MetricMetadata::INFO: return "info"sv;
case prometheus::MetricMetadata::STATESET: return "stateset"sv;
default: break;
}
return "";
}
/// Converts metrics metadata from the protobuf format to prepared blocks for inserting into target tables.
BlocksToInsert toBlocks(const google::protobuf::RepeatedPtrField<prometheus::MetricMetadata> & metrics_metadata,
const StorageID & time_series_storage_id,
const StorageInMemoryMetadata & time_series_storage_metadata,
const TimeSeriesSettings & time_series_settings)
{
size_t num_rows = metrics_metadata.size();
if (!num_rows)
return {}; /// Nothing to insert into target tables.
/// Column types must be extracted from the target tables' metadata.
const auto & columns_description = time_series_storage_metadata.columns;
auto get_column_description = [&](const String & column_name) -> const ColumnDescription &
{
return getInsertableColumnDescription(columns_description, column_name, time_series_storage_id);
};
/// We're going to prepare one blocks for the "metrics" table.
Block block;
auto make_column = [&](const ColumnDescription & column_description) -> IColumn &
{
auto column = column_description.type->createColumn();
column->reserve(num_rows);
auto * column_ptr = column.get();
block.insert(ColumnWithTypeAndName{std::move(column), column_description.type, column_description.name});
return *column_ptr;
};
/// Create columns.
/// Column "metric_family_name".
const auto & metric_family_name_description = get_column_description(TimeSeriesColumnNames::MetricFamilyName);
TimeSeriesColumnsValidator validator{time_series_storage_id, time_series_settings};
validator.validateColumnForMetricFamilyName(metric_family_name_description);
auto & metric_family_name_column = make_column(metric_family_name_description);
/// Column "type".
const auto & type_description = get_column_description(TimeSeriesColumnNames::Type);
validator.validateColumnForType(type_description);
auto & type_column = make_column(type_description);
/// Column "unit".
const auto & unit_description = get_column_description(TimeSeriesColumnNames::Unit);
validator.validateColumnForUnit(unit_description);
auto & unit_column = make_column(unit_description);
/// Column "help".
const auto & help_description = get_column_description(TimeSeriesColumnNames::Help);
validator.validateColumnForHelp(help_description);
auto & help_column = make_column(help_description);
/// Fill those columns.
for (const auto & element : metrics_metadata)
{
const auto & metric_family_name = element.metric_family_name();
const auto & type_str = metricTypeToString(element.type());
const auto & help = element.help();
const auto & unit = element.unit();
metric_family_name_column.insertData(metric_family_name.data(), metric_family_name.length());
type_column.insertData(type_str.data(), type_str.length());
unit_column.insertData(unit.data(), unit.length());
help_column.insertData(help.data(), help.length());
}
/// Prepare a result.
BlocksToInsert res;
res.blocks.emplace_back(ViewTarget::Metrics, std::move(block));
return res;
}
/// Inserts blocks to target tables.
void insertToTargetTables(BlocksToInsert && blocks, StorageTimeSeries & time_series_storage, ContextPtr context, Poco::Logger * log)
{
auto time_series_storage_id = time_series_storage.getStorageID();
for (auto & [table_kind, block] : blocks.blocks)
{
if (block)
{
const auto & target_table_id = time_series_storage.getTargetTableId(table_kind);
LOG_INFO(log, "{}: Inserting {} rows to the {} table",
time_series_storage_id.getNameForLogs(), block.rows(), toString(table_kind));
auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->table_id = target_table_id;
auto columns_ast = std::make_shared<ASTExpressionList>();
for (const auto & name : block.getNames())
columns_ast->children.emplace_back(std::make_shared<ASTIdentifier>(name));
insert_query->columns = columns_ast;
ContextMutablePtr insert_context = Context::createCopy(context);
insert_context->setCurrentQueryId(context->getCurrentQueryId() + ":" + String{toString(table_kind)});
InterpreterInsertQuery interpreter(insert_query, insert_context);
BlockIO io = interpreter.execute();
PushingPipelineExecutor executor(io.pipeline);
executor.start();
executor.push(std::move(block));
executor.finish();
}
}
}
}
PrometheusRemoteWriteProtocol::PrometheusRemoteWriteProtocol(StoragePtr time_series_storage_, const ContextPtr & context_)
: WithContext(context_)
, time_series_storage(storagePtrToTimeSeries(time_series_storage_))
, log(getLogger("PrometheusRemoteWriteProtocol"))
{
}
PrometheusRemoteWriteProtocol::~PrometheusRemoteWriteProtocol() = default;
void PrometheusRemoteWriteProtocol::writeTimeSeries(const google::protobuf::RepeatedPtrField<prometheus::TimeSeries> & time_series)
{
auto time_series_storage_id = time_series_storage->getStorageID();
LOG_TRACE(log, "{}: Writing {} time series",
time_series_storage_id.getNameForLogs(), time_series.size());
auto time_series_storage_metadata = time_series_storage->getInMemoryMetadataPtr();
auto time_series_settings = time_series_storage->getStorageSettingsPtr();
auto blocks = toBlocks(time_series, getContext(), time_series_storage_id, *time_series_storage_metadata, *time_series_settings);
insertToTargetTables(std::move(blocks), *time_series_storage, getContext(), log.get());
LOG_TRACE(log, "{}: {} time series written",
time_series_storage_id.getNameForLogs(), time_series.size());
}
void PrometheusRemoteWriteProtocol::writeMetricsMetadata(const google::protobuf::RepeatedPtrField<prometheus::MetricMetadata> & metrics_metadata)
{
auto time_series_storage_id = time_series_storage->getStorageID();
LOG_TRACE(log, "{}: Writing {} metrics metadata",
time_series_storage_id.getNameForLogs(), metrics_metadata.size());
auto time_series_storage_metadata = time_series_storage->getInMemoryMetadataPtr();
auto time_series_settings = time_series_storage->getStorageSettingsPtr();
auto blocks = toBlocks(metrics_metadata, time_series_storage_id, *time_series_storage_metadata, *time_series_settings);
insertToTargetTables(std::move(blocks), *time_series_storage, getContext(), log.get());
LOG_TRACE(log, "{}: {} metrics metadata written",
time_series_storage_id.getNameForLogs(), metrics_metadata.size());
}
}
#endif

View File

@ -0,0 +1,35 @@
#pragma once
#include "config.h"
#if USE_PROMETHEUS_PROTOBUFS
#include <Interpreters/Context_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <prompb/remote.pb.h>
namespace DB
{
class StorageTimeSeries;
/// Helper class to support the prometheus remote write protocol.
class PrometheusRemoteWriteProtocol : WithContext
{
public:
PrometheusRemoteWriteProtocol(StoragePtr time_series_storage_, const ContextPtr & context_);
~PrometheusRemoteWriteProtocol();
/// Insert time series received by remote write protocol to our table.
void writeTimeSeries(const google::protobuf::RepeatedPtrField<prometheus::TimeSeries> & time_series);
/// Insert metrics metadata received by remote write protocol to our table.
void writeMetricsMetadata(const google::protobuf::RepeatedPtrField<prometheus::MetricMetadata> & metrics_metadata);
private:
std::shared_ptr<StorageTimeSeries> time_series_storage;
Poco::LoggerPtr log;
};
}
#endif

View File

@ -0,0 +1,13 @@
#pragma once
namespace DB
{
/// Label names with special meaning.
struct TimeSeriesTagNames
{
static constexpr const char * MetricName = "__name__";
};
}