mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #70688 from ClickHouse/backport/24.8/70358
Backport #70358 to 24.8: Fix possible use-after-free in SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf
This commit is contained in:
commit
d9fb393951
@ -112,7 +112,7 @@ private:
|
||||
};
|
||||
|
||||
|
||||
const google::protobuf::Descriptor *
|
||||
ProtobufSchemas::DescriptorHolder
|
||||
ProtobufSchemas::getMessageTypeForFormatSchema(const FormatSchemaInfo & info, WithEnvelope with_envelope, const String & google_protos_path)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -121,10 +121,10 @@ ProtobufSchemas::getMessageTypeForFormatSchema(const FormatSchemaInfo & info, Wi
|
||||
it = importers
|
||||
.emplace(
|
||||
info.schemaDirectory(),
|
||||
std::make_unique<ImporterWithSourceTree>(info.schemaDirectory(), google_protos_path, with_envelope))
|
||||
std::make_shared<ImporterWithSourceTree>(info.schemaDirectory(), google_protos_path, with_envelope))
|
||||
.first;
|
||||
auto * importer = it->second.get();
|
||||
return importer->import(info.schemaPath(), info.messageName());
|
||||
return DescriptorHolder(it->second, importer->import(info.schemaPath(), info.messageName()));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -57,14 +57,31 @@ public:
|
||||
// Clear cached protobuf schemas
|
||||
void clear();
|
||||
|
||||
/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
|
||||
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
|
||||
const google::protobuf::Descriptor *
|
||||
class ImporterWithSourceTree;
|
||||
struct DescriptorHolder
|
||||
{
|
||||
DescriptorHolder(std::shared_ptr<ImporterWithSourceTree> importer_, const google::protobuf::Descriptor * message_descriptor_)
|
||||
: importer(std::move(importer_))
|
||||
, message_descriptor(message_descriptor_)
|
||||
{}
|
||||
private:
|
||||
std::shared_ptr<ImporterWithSourceTree> importer;
|
||||
public:
|
||||
const google::protobuf::Descriptor * message_descriptor;
|
||||
};
|
||||
|
||||
/// Parses the format schema, then parses the corresponding proto file, and
|
||||
/// returns holder (since the descriptor only valid if
|
||||
/// ImporterWithSourceTree is valid):
|
||||
///
|
||||
/// {ImporterWithSourceTree, protobuf::Descriptor - descriptor of the message type}.
|
||||
///
|
||||
/// The function always return valid message descriptor, it throws an exception if it cannot load or parse the file.
|
||||
DescriptorHolder
|
||||
getMessageTypeForFormatSchema(const FormatSchemaInfo & info, WithEnvelope with_envelope, const String & google_protos_path);
|
||||
|
||||
private:
|
||||
class ImporterWithSourceTree;
|
||||
std::unordered_map<String, std::unique_ptr<ImporterWithSourceTree>> importers;
|
||||
std::unordered_map<String, std::shared_ptr<ImporterWithSourceTree>> importers;
|
||||
std::mutex mutex;
|
||||
};
|
||||
|
||||
|
@ -3875,26 +3875,32 @@ std::unique_ptr<ProtobufSerializer> ProtobufSerializer::create(
|
||||
const Strings & column_names,
|
||||
const DataTypes & data_types,
|
||||
std::vector<size_t> & missing_column_indices,
|
||||
const google::protobuf::Descriptor & message_descriptor,
|
||||
const ProtobufSchemas::DescriptorHolder & descriptor,
|
||||
bool with_length_delimiter,
|
||||
bool with_envelope,
|
||||
bool flatten_google_wrappers,
|
||||
ProtobufReader & reader)
|
||||
{
|
||||
return ProtobufSerializerBuilder(reader).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope, flatten_google_wrappers);
|
||||
return ProtobufSerializerBuilder(reader).buildMessageSerializer(
|
||||
column_names, data_types, missing_column_indices,
|
||||
*descriptor.message_descriptor,
|
||||
with_length_delimiter, with_envelope, flatten_google_wrappers);
|
||||
}
|
||||
|
||||
std::unique_ptr<ProtobufSerializer> ProtobufSerializer::create(
|
||||
const Strings & column_names,
|
||||
const DataTypes & data_types,
|
||||
const google::protobuf::Descriptor & message_descriptor,
|
||||
const ProtobufSchemas::DescriptorHolder & descriptor,
|
||||
bool with_length_delimiter,
|
||||
bool with_envelope,
|
||||
bool defaults_for_nullable_google_wrappers,
|
||||
ProtobufWriter & writer)
|
||||
{
|
||||
std::vector<size_t> missing_column_indices;
|
||||
return ProtobufSerializerBuilder(writer).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope, defaults_for_nullable_google_wrappers);
|
||||
return ProtobufSerializerBuilder(writer).buildMessageSerializer(
|
||||
column_names, data_types, missing_column_indices,
|
||||
*descriptor.message_descriptor,
|
||||
with_length_delimiter, with_envelope, defaults_for_nullable_google_wrappers);
|
||||
}
|
||||
|
||||
NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor, bool skip_unsupported_fields)
|
||||
|
@ -4,7 +4,8 @@
|
||||
|
||||
#if USE_PROTOBUF
|
||||
# include <Columns/IColumn.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
# include <Core/NamesAndTypes.h>
|
||||
# include <Formats/ProtobufSchemas.h>
|
||||
|
||||
|
||||
namespace google::protobuf { class Descriptor; }
|
||||
@ -39,7 +40,7 @@ public:
|
||||
const Strings & column_names,
|
||||
const DataTypes & data_types,
|
||||
std::vector<size_t> & missing_column_indices,
|
||||
const google::protobuf::Descriptor & message_descriptor,
|
||||
const ProtobufSchemas::DescriptorHolder & descriptor,
|
||||
bool with_length_delimiter,
|
||||
bool with_envelope,
|
||||
bool flatten_google_wrappers,
|
||||
@ -48,7 +49,7 @@ public:
|
||||
static std::unique_ptr<ProtobufSerializer> create(
|
||||
const Strings & column_names,
|
||||
const DataTypes & data_types,
|
||||
const google::protobuf::Descriptor & message_descriptor,
|
||||
const ProtobufSchemas::DescriptorHolder & descriptor,
|
||||
bool with_length_delimiter,
|
||||
bool with_envelope,
|
||||
bool defaults_for_nullable_google_wrappers,
|
||||
|
@ -23,7 +23,7 @@ ProtobufListInputFormat::ProtobufListInputFormat(
|
||||
header_.getNames(),
|
||||
header_.getDataTypes(),
|
||||
missing_column_indices,
|
||||
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::Yes, google_protos_path),
|
||||
/* with_length_delimiter = */ true,
|
||||
/* with_envelope = */ true,
|
||||
@ -93,9 +93,9 @@ ProtobufListSchemaReader::ProtobufListSchemaReader(const FormatSettings & format
|
||||
|
||||
NamesAndTypesList ProtobufListSchemaReader::readSchema()
|
||||
{
|
||||
const auto * message_descriptor
|
||||
= ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info, ProtobufSchemas::WithEnvelope::Yes, google_protos_path);
|
||||
return protobufSchemaToCHSchema(message_descriptor, skip_unsupported_fields);
|
||||
auto descriptor = ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
schema_info, ProtobufSchemas::WithEnvelope::Yes, google_protos_path);
|
||||
return protobufSchemaToCHSchema(descriptor.message_descriptor, skip_unsupported_fields);
|
||||
}
|
||||
|
||||
void registerInputFormatProtobufList(FormatFactory & factory)
|
||||
|
@ -20,7 +20,7 @@ ProtobufListOutputFormat::ProtobufListOutputFormat(
|
||||
, serializer(ProtobufSerializer::create(
|
||||
header_.getNames(),
|
||||
header_.getDataTypes(),
|
||||
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::Yes, google_protos_path),
|
||||
/* with_length_delimiter = */ true,
|
||||
/* with_envelope = */ true,
|
||||
|
@ -19,7 +19,7 @@ ProtobufRowInputFormat::ProtobufRowInputFormat(
|
||||
bool flatten_google_wrappers_,
|
||||
const String & google_protos_path)
|
||||
: IRowInputFormat(header_, in_, params_)
|
||||
, message_descriptor(ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
, descriptor(ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::No, google_protos_path))
|
||||
, with_length_delimiter(with_length_delimiter_)
|
||||
, flatten_google_wrappers(flatten_google_wrappers_)
|
||||
@ -33,7 +33,7 @@ void ProtobufRowInputFormat::createReaderAndSerializer()
|
||||
getPort().getHeader().getNames(),
|
||||
getPort().getHeader().getDataTypes(),
|
||||
missing_column_indices,
|
||||
*message_descriptor,
|
||||
descriptor,
|
||||
with_length_delimiter,
|
||||
/* with_envelope = */ false,
|
||||
flatten_google_wrappers,
|
||||
@ -132,9 +132,9 @@ ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_setting
|
||||
|
||||
NamesAndTypesList ProtobufSchemaReader::readSchema()
|
||||
{
|
||||
const auto * message_descriptor
|
||||
= ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info, ProtobufSchemas::WithEnvelope::No, google_protos_path);
|
||||
return protobufSchemaToCHSchema(message_descriptor, skip_unsupported_fields);
|
||||
auto descriptor = ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
schema_info, ProtobufSchemas::WithEnvelope::No, google_protos_path);
|
||||
return protobufSchemaToCHSchema(descriptor.message_descriptor, skip_unsupported_fields);
|
||||
}
|
||||
|
||||
void registerProtobufSchemaReader(FormatFactory & factory)
|
||||
|
@ -6,7 +6,7 @@
|
||||
# include <Processors/Formats/IRowInputFormat.h>
|
||||
# include <Processors/Formats/ISchemaReader.h>
|
||||
# include <Formats/FormatSchemaInfo.h>
|
||||
# include <google/protobuf/descriptor.h>
|
||||
# include <Formats/ProtobufSchemas.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -57,7 +57,7 @@ private:
|
||||
std::vector<size_t> missing_column_indices;
|
||||
std::unique_ptr<ProtobufSerializer> serializer;
|
||||
|
||||
const google::protobuf::Descriptor * message_descriptor;
|
||||
const ProtobufSchemas::DescriptorHolder descriptor;
|
||||
bool with_length_delimiter;
|
||||
bool flatten_google_wrappers;
|
||||
};
|
||||
|
@ -27,7 +27,7 @@ ProtobufRowOutputFormat::ProtobufRowOutputFormat(
|
||||
, serializer(ProtobufSerializer::create(
|
||||
header_.getNames(),
|
||||
header_.getDataTypes(),
|
||||
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
ProtobufSchemas::instance().getMessageTypeForFormatSchema(
|
||||
schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::No, settings_.protobuf.google_protos_path),
|
||||
with_length_delimiter_,
|
||||
/* with_envelope = */ false,
|
||||
|
@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: race
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
SCHEMADIR=$CLICKHOUSE_SCHEMA_FILES
|
||||
CLIENT_SCHEMADIR=$CURDIR/format_schemas
|
||||
export SERVER_SCHEMADIR=$CLICKHOUSE_DATABASE
|
||||
mkdir -p $SCHEMADIR/$SERVER_SCHEMADIR
|
||||
cp -r $CLIENT_SCHEMADIR/03250.proto $SCHEMADIR/$SERVER_SCHEMADIR/
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf"
|
||||
|
||||
BINARY_FILE_PATH=$(mktemp "$CLICKHOUSE_USER_FILES/03250.XXXXXX.binary")
|
||||
export BINARY_FILE_PATH
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM numbers(10) FORMAT Protobuf SETTINGS format_schema = '$CLIENT_SCHEMADIR/03250:Numbers'" > $BINARY_FILE_PATH
|
||||
chmod 666 "$BINARY_FILE_PATH"
|
||||
|
||||
function protobuf_reader()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count() FROM file('$(basename $BINARY_FILE_PATH)', 'Protobuf') FORMAT Null SETTINGS max_threads=1, format_schema='$SERVER_SCHEMADIR/03250:Numbers'"
|
||||
done
|
||||
}
|
||||
export -f protobuf_reader
|
||||
|
||||
function protobuf_cache_drainer()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf"
|
||||
done
|
||||
}
|
||||
export -f protobuf_cache_drainer
|
||||
|
||||
timeout 20 bash -c protobuf_reader &
|
||||
timeout 20 bash -c protobuf_cache_drainer &
|
||||
wait
|
||||
|
||||
rm -f "${BINARY_FILE_PATH:?}"
|
||||
rm -fr "${SCHEMADIR:?}/${SERVER_SCHEMADIR:?}/"
|
5
tests/queries/0_stateless/format_schemas/03250.proto
Normal file
5
tests/queries/0_stateless/format_schemas/03250.proto
Normal file
@ -0,0 +1,5 @@
|
||||
syntax = "proto3";
|
||||
|
||||
message Numbers {
|
||||
uint64 number = 1;
|
||||
};
|
Loading…
Reference in New Issue
Block a user