mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 11:22:12 +00:00
Merge branch 'protocol-break-oom' into protocol-compression-auto
This commit is contained in:
commit
58681e7315
@ -41,7 +41,7 @@ void NativeBlockOutputStream::flush()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit)
|
static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit)
|
||||||
{
|
{
|
||||||
/** If there are columns-constants - then we materialize them.
|
/** If there are columns-constants - then we materialize them.
|
||||||
* (Since the data type does not know how to serialize / deserialize constants.)
|
* (Since the data type does not know how to serialize / deserialize constants.)
|
||||||
|
@ -30,8 +30,6 @@ public:
|
|||||||
void write(const Block & block) override;
|
void write(const Block & block) override;
|
||||||
void flush() override;
|
void flush() override;
|
||||||
|
|
||||||
static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit);
|
|
||||||
|
|
||||||
String getContentType() const override { return "application/octet-stream"; }
|
String getContentType() const override { return "application/octet-stream"; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -80,9 +80,11 @@ namespace DB
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ParallelFormattingOutputFormat::collectorThreadFunction()
|
void ParallelFormattingOutputFormat::collectorThreadFunction(const ThreadGroupStatusPtr & thread_group)
|
||||||
{
|
{
|
||||||
setThreadName("Collector");
|
setThreadName("Collector");
|
||||||
|
if (thread_group)
|
||||||
|
CurrentThread::attachToIfDetached(thread_group);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -135,9 +137,11 @@ namespace DB
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number)
|
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group)
|
||||||
{
|
{
|
||||||
setThreadName("Formatter");
|
setThreadName("Formatter");
|
||||||
|
if (thread_group)
|
||||||
|
CurrentThread::attachToIfDetached(thread_group);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -76,7 +76,10 @@ public:
|
|||||||
/// Just heuristic. We need one thread for collecting, one thread for receiving chunks
|
/// Just heuristic. We need one thread for collecting, one thread for receiving chunks
|
||||||
/// and n threads for formatting.
|
/// and n threads for formatting.
|
||||||
processing_units.resize(params.max_threads_for_parallel_formatting + 2);
|
processing_units.resize(params.max_threads_for_parallel_formatting + 2);
|
||||||
collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); });
|
collector_thread = ThreadFromGlobalPool([thread_group = CurrentThread::getGroup(), this]
|
||||||
|
{
|
||||||
|
collectorThreadFunction(thread_group);
|
||||||
|
});
|
||||||
LOG_TRACE(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used");
|
LOG_TRACE(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,14 +203,17 @@ private:
|
|||||||
|
|
||||||
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number)
|
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number)
|
||||||
{
|
{
|
||||||
pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); });
|
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup(), ticket_number]
|
||||||
|
{
|
||||||
|
formatterThreadFunction(ticket_number, thread_group);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Collects all temporary buffers into main WriteBuffer.
|
/// Collects all temporary buffers into main WriteBuffer.
|
||||||
void collectorThreadFunction();
|
void collectorThreadFunction(const ThreadGroupStatusPtr & thread_group);
|
||||||
|
|
||||||
/// This function is executed in ThreadPool and the only purpose of it is to format one Chunk into a continuous buffer in memory.
|
/// This function is executed in ThreadPool and the only purpose of it is to format one Chunk into a continuous buffer in memory.
|
||||||
void formatterThreadFunction(size_t current_unit_number);
|
void formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1409,6 +1409,11 @@ void TCPHandler::sendData(const Block & block)
|
|||||||
{
|
{
|
||||||
initBlockOutput(block);
|
initBlockOutput(block);
|
||||||
|
|
||||||
|
auto prev_bytes_written_out = out->count();
|
||||||
|
auto prev_bytes_written_compressed_out = state.maybe_compressed_out->count();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
writeVarUInt(Protocol::Server::Data, *out);
|
writeVarUInt(Protocol::Server::Data, *out);
|
||||||
/// Send external table name (empty name is the main table)
|
/// Send external table name (empty name is the main table)
|
||||||
writeStringBinary("", *out);
|
writeStringBinary("", *out);
|
||||||
@ -1425,6 +1430,28 @@ void TCPHandler::sendData(const Block & block)
|
|||||||
state.block_out->write(block);
|
state.block_out->write(block);
|
||||||
state.maybe_compressed_out->next();
|
state.maybe_compressed_out->next();
|
||||||
out->next();
|
out->next();
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
/// In case of unsuccessful write, if the buffer with written data was not flushed,
|
||||||
|
/// we will rollback write to avoid breaking the protocol.
|
||||||
|
/// (otherwise the client will not be able to receive exception after unfinished data
|
||||||
|
/// as it will expect the continuation of the data).
|
||||||
|
/// It looks like hangs on client side or a message like "Data compressed with different methods".
|
||||||
|
|
||||||
|
if (state.compression == Protocol::Compression::Enable)
|
||||||
|
{
|
||||||
|
auto extra_bytes_written_compressed = state.maybe_compressed_out->count() - prev_bytes_written_compressed_out;
|
||||||
|
if (state.maybe_compressed_out->offset() >= extra_bytes_written_compressed)
|
||||||
|
state.maybe_compressed_out->position() -= extra_bytes_written_compressed;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto extra_bytes_written_out = out->count() - prev_bytes_written_out;
|
||||||
|
if (out->offset() >= extra_bytes_written_out)
|
||||||
|
out->position() -= extra_bytes_written_out;
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
Code: 241
|
7
tests/queries/0_stateless/01783_parallel_formatting_memory.sh
Executable file
7
tests/queries/0_stateless/01783_parallel_formatting_memory.sh
Executable file
@ -0,0 +1,7 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL&max_memory_usage=1G" -d "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" | grep -oF 'Code: 241'
|
@ -0,0 +1,2 @@
|
|||||||
|
SET max_memory_usage = '1G';
|
||||||
|
SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number; -- { serverError 241 }
|
@ -0,0 +1,2 @@
|
|||||||
|
Code: 241
|
||||||
|
Code: 241
|
8
tests/queries/0_stateless/01785_parallel_formatting_memory.sh
Executable file
8
tests/queries/0_stateless/01785_parallel_formatting_memory.sh
Executable file
@ -0,0 +1,8 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --compress 0 --max_memory_usage 1G --query "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" 2>&1 | grep -oF 'Code: 241' | head -n1
|
||||||
|
$CLICKHOUSE_CLIENT --compress 1 --max_memory_usage 1G --query "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" 2>&1 | grep -oF 'Code: 241' | head -n1
|
Loading…
Reference in New Issue
Block a user