Introduce IO format "ProtobufList" with protobuf schema
// schemafile.proto
message Envelope {
message MessageType {
uint32 colA = 1;
string colB = 2;
}
repeated MessageType mt = 1;
}
where "Envelope" is a hard-coded/expected top-level message and
"MessageType" is a message with user-provided name containing the table
fields to export/import, e.g.
SELECT * FROM db1.tab1 FORMAT ProtobufList SETTINGS format_schema =
'schemafile:MessageType'
As a result, the new format wraps a list of messages (one per row) into
a single, containing message. Compare that to the schema of the existing
IO formats "Protobuf" and "ProtobufSingle":
message MessageType {
uint32 colA = 1;
string colB = 2;
}
The new format does not save space compared to the existing formats, but
it is conceptually a bit more beautiful and also more convenenient.
Implementation details:
- Created new files ProtobufList(Input|Output)Format which use the
existing ProtobufSerializer mechanism. The goal was to reuse as much
code as possible and avoid copypasta.
- I was torn between inheriting from I(Input|Output)Format vs.
IRow(Input|Output)Format for ProtobufList(Input|Output)Format. The
former is chunk-based which can be better for performance. Since the
ProtobufSerializer mechanism is row-based but data is generally passed
around in chunks, I decided for the latter to leverage the existing
chunk <--> row mapping code in IRow(InputOutput)Format.
- A new ProtobufSerializer called ProtobufSerializerEnvelope was
introduced (--> ProtobufSerializer.cpp). It represents the top-level
message which encloses the list of inner nested messages, i.e. the
rows.
- With the new format, parsing the schema file and matching the fields in
the schema file to table column works like for the old formats. The only
difference is that parsing starts one level below the "Envelope" (-->
ProtobufSchema.cpp). This is more natural than forcing customers to
have table columns start with "Envelope".
- Creation of the ProtobufSerializer tree also works like before. What
is different is that we finally add a ProtobufSerializerEnvelope as
new root of the tree. It's only purpose is to write/read the top-level
message for the first/last row to write/read.
Caveats:
- The low-level serialization code in ProtobufWriter uses an internal
buffer which is flushed to the output file only in endMessage().
In the existing "Protobuf" format, this happens once per row, in the
new format this happens only at the end of the serialization
since row-level messages now call start/endNestedMessage(). As a
future TODO to, the buffer should be flushed also in
start/endNestedMessage() to reduce memory consumption.
Reproducer:
# NOTE: we need clickhouse from 33957 since right now LSan is broken due to getauxval().
$ url=https://s3.amazonaws.com/clickhouse-builds/33957/e04b862673644d313712607a0078f5d1c48b5377/package_asan/clickhouse
$ wget $url -o clickhouse-asan
$ chmod +x clickhouse-asan
$ ./clickhouse-asan server &
$ ./clickhouse-asan client
:) create table data (key Int, value String) engine=MergeTree() order by key
:) insert into data select number%5, toString(number) from numbers(10e6)
# usually it is enough one query, benchmark is just for stability of the results
# note, that if the exception was not happen from AggregatingInOrderTransform then add --continue_on_errors and wait
$ ./clickhouse-asan benchmark --query 'select key, uniqCombined64(value), groupArray(value) from data group by key' --optimize_aggregation_in_order=1 --memory_tracker_fault_probability=0.01, max_untracked_memory='2Mi'
LSan report:
==24595==ERROR: LeakSanitizer: detected memory leaks
Direct leak of 3932160 byte(s) in 6 object(s) allocated from:
0 0xcadba93 in realloc ()
1 0xcc108d9 in Allocator<false, false>::realloc() obj-x86_64-linux-gnu/../src/Common/Allocator.h:134:30
2 0xde19eae in void DB::PODArrayBase<>::realloc<DB::Arena*&>(unsigned long, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:161:25
3 0xde5f039 in void DB::PODArrayBase<>::reserveForNextSize<DB::Arena*&>(DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h
4 0xde5f039 in void DB::PODArray<>::push_back<>(DB::GroupArrayNodeString*&, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:432:19
5 0xde5f039 in DB::GroupArrayGeneralImpl<>::add() const obj-x86_64-linux-gnu/../src/AggregateFunctions/AggregateFunctionGroupArray.h:465:31
6 0xde5f039 in DB::IAggregateFunctionHelper<>::addBatchSinglePlaceFromInterval() const obj-x86_64-linux-gnu/../src/AggregateFunctions/IAggregateFunction.h:481:53
7 0x299df134 in DB::Aggregator::executeOnIntervalWithoutKeyImpl() obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:869:31
8 0x2ca75f7d in DB::AggregatingInOrderTransform::consume() obj-x86_64-linux-gnu/../src/Processors/Transforms/AggregatingInOrderTransform.cpp:124:13
...
SUMMARY: AddressSanitizer: 4523184 byte(s) leaked in 12 allocation(s).
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This patch adds use_skip_indexes_if_final setting, that is OFF by
default. Since skipping data for queries with FINAL may produce
incorrect result.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
After detachQueryIfNotDetached() had been removed it is not enough to
use attachTo() for ThreadPool (scheduleOrThrowOnError()) since the query
may be already attached, if the thread doing multiple jobs, so
CurrentThread::attachToIfDetached() should be used instead.
This should fix all the places from the failures on CI [1]:
$ fgrep DB::CurrentThread::attachTo -A1 ~/Downloads/47.txt | fgrep -v attachTo | cut -d' ' -f5,6 | sort | uniq -c
92 --
2 /fasttest-workspace/build/../../ClickHouse/contrib/libcxx/include/deque:1393: DB::ParallelParsingInputFormat::parserThreadFunction(std::__1::shared_ptr<DB::ThreadGroupStatus>,
4 /fasttest-workspace/build/../../ClickHouse/src/Storages/MergeTree/MergeTreeData.cpp:1595: void
87 /fasttest-workspace/build/../../ClickHouse/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp:993: void
[1]: https://github.com/ClickHouse/ClickHouse/runs/4954466034?check_suite_focus=true
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
MemoryTracker starts accounting memory directly only after per-thread
allocation exceeded max_untracker_memory (or memory_profiler_step).
But even memory under this limit should be accounted too, and there is
code to do this in ThreadStatus dtor, however due to
PullingAsyncPipelineExecutor detached the query from thread group that
memory was not accounted.
So remove CurrentThread::detachQueryIfNotDetached() from threads that
uses ThreadFromGlobalPool since it has ThreadStatus, and the query will
be detached using CurrentThread::defaultThreadDeleter.
Note, that before this patch memory accounting works for HTTP queries
due to it had been accounted from ParallelFormattingOutputFormat, but
not for TCP.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>