mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #28150 from FArthur-cmd/fix_compression_26149
Fix `Attempt to read after eof with enabled data compression`
This commit is contained in:
commit
fc37817ada
@ -396,6 +396,9 @@ function run_tests
|
||||
|
||||
# needs s3
|
||||
01944_insert_partition_by
|
||||
|
||||
# depends on Go
|
||||
02013_zlib_read_after_eof
|
||||
)
|
||||
|
||||
time clickhouse-test --hung-check -j 8 --order=random --use-skip-list \
|
||||
|
@ -24,6 +24,8 @@ RUN apt-get update -y \
|
||||
python3-pip \
|
||||
qemu-user-static \
|
||||
sudo \
|
||||
# golang version 1.13 on Ubuntu 20 is enough for tests
|
||||
golang \
|
||||
telnet \
|
||||
tree \
|
||||
unixodbc \
|
||||
|
@ -38,7 +38,7 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("inflateInit2 failed: ") + zError(rc) + "; zlib version: " + ZLIB_VERSION, ErrorCodes::ZLIB_INFLATE_FAILED);
|
||||
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflateInit2 failed: {}; zlib version: {}.", zError(rc), ZLIB_VERSION);
|
||||
}
|
||||
|
||||
ZlibInflatingReadBuffer::~ZlibInflatingReadBuffer()
|
||||
@ -48,41 +48,60 @@ ZlibInflatingReadBuffer::~ZlibInflatingReadBuffer()
|
||||
|
||||
bool ZlibInflatingReadBuffer::nextImpl()
|
||||
{
|
||||
if (eof)
|
||||
return false;
|
||||
|
||||
if (!zstr.avail_in)
|
||||
/// Need do-while loop to prevent situation, when
|
||||
/// eof was not reached, but working buffer became empty (when nothing was decompressed in current iteration)
|
||||
/// (this happens with compression algorithms, same idea is implemented in ZstdInflatingReadBuffer)
|
||||
do
|
||||
{
|
||||
in->nextIfAtEnd();
|
||||
zstr.next_in = reinterpret_cast<unsigned char *>(in->position());
|
||||
zstr.avail_in = in->buffer().end() - in->position();
|
||||
}
|
||||
zstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
|
||||
zstr.avail_out = internal_buffer.size();
|
||||
/// if we already found eof, we shouldn't do anything
|
||||
if (eof)
|
||||
return false;
|
||||
|
||||
int rc = inflate(&zstr, Z_NO_FLUSH);
|
||||
|
||||
in->position() = in->buffer().end() - zstr.avail_in;
|
||||
working_buffer.resize(internal_buffer.size() - zstr.avail_out);
|
||||
|
||||
if (rc == Z_STREAM_END)
|
||||
{
|
||||
if (in->eof())
|
||||
/// if there is no available bytes in zstr, move ptr to next available data
|
||||
if (!zstr.avail_in)
|
||||
{
|
||||
eof = true;
|
||||
return !working_buffer.empty();
|
||||
in->nextIfAtEnd();
|
||||
zstr.next_in = reinterpret_cast<unsigned char *>(in->position());
|
||||
zstr.avail_in = in->buffer().end() - in->position();
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = inflateReset(&zstr);
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("inflateReset failed: ") + zError(rc), ErrorCodes::ZLIB_INFLATE_FAILED);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("inflate failed: ") + zError(rc), ErrorCodes::ZLIB_INFLATE_FAILED);
|
||||
/// init output bytes (place, where decompressed data will be)
|
||||
zstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
|
||||
zstr.avail_out = internal_buffer.size();
|
||||
|
||||
int rc = inflate(&zstr, Z_NO_FLUSH);
|
||||
|
||||
/// move in stream on place, where reading stopped
|
||||
in->position() = in->buffer().end() - zstr.avail_in;
|
||||
/// change size of working buffer (it's size equal to internal_buffer size without unused uncompressed values)
|
||||
working_buffer.resize(internal_buffer.size() - zstr.avail_out);
|
||||
|
||||
/// If end was reached, it can be end of file or end of part (for example, chunk)
|
||||
if (rc == Z_STREAM_END)
|
||||
{
|
||||
/// if it is end of file, remember this and return
|
||||
/// * true if we can work with working buffer (we still have something to read, so next must return true)
|
||||
/// * false if there is no data in working buffer
|
||||
if (in->eof())
|
||||
{
|
||||
eof = true;
|
||||
return !working_buffer.empty();
|
||||
}
|
||||
/// If it is not end of file, we need to reset zstr and return true, because we still have some data to read
|
||||
else
|
||||
{
|
||||
rc = inflateReset(&zstr);
|
||||
if (rc != Z_OK)
|
||||
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflateReset failed: {}", zError(rc));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
/// If it is not end and not OK, something went wrong, throw exception
|
||||
if (rc != Z_OK)
|
||||
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflateReset failed: {}", zError(rc));
|
||||
}
|
||||
while (working_buffer.empty());
|
||||
|
||||
/// if code reach this section, working buffer is not empty, so we have some data to process
|
||||
return true;
|
||||
}
|
||||
|
||||
|
61
tests/queries/0_stateless/02013_zlib_read_after_eof.go
Normal file
61
tests/queries/0_stateless/02013_zlib_read_after_eof.go
Normal file
@ -0,0 +1,61 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
)
|
||||
|
||||
func compress(data io.Reader) io.Reader {
|
||||
pr, pw := io.Pipe()
|
||||
gw := gzip.NewWriter(pw)
|
||||
|
||||
go func() {
|
||||
_, _ = io.Copy(gw, data)
|
||||
gw.Close()
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
return pr
|
||||
}
|
||||
|
||||
func main() {
|
||||
database := os.Getenv("CLICKHOUSE_DATABASE")
|
||||
p, err := url.Parse("http://localhost:8123/")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
q := p.Query()
|
||||
|
||||
q.Set("query", "INSERT INTO "+database+".graphite FORMAT RowBinary")
|
||||
p.RawQuery = q.Encode()
|
||||
queryUrl := p.String()
|
||||
|
||||
var req *http.Request
|
||||
|
||||
req, err = http.NewRequest("POST", queryUrl, compress(os.Stdin))
|
||||
req.Header.Add("Content-Encoding", "gzip")
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{DisableKeepAlives: true},
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
panic(fmt.Errorf("clickhouse response status %d: %s", resp.StatusCode, string(body)))
|
||||
}
|
||||
}
|
@ -0,0 +1 @@
|
||||
10000
|
18
tests/queries/0_stateless/02013_zlib_read_after_eof.sh
Executable file
18
tests/queries/0_stateless/02013_zlib_read_after_eof.sh
Executable file
@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
DATA_FILE=$CUR_DIR/data_zlib/02013_zlib_read_after_eof_data
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS graphite;"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE graphite(\`Path\` String, \`Value\` Float64, \`Time\` UInt32, \`Date\` Date, \`Timestamp\` UInt32) \
|
||||
ENGINE = MergeTree PARTITION BY toYYYYMM(Date) ORDER BY (Path, Time) SETTINGS index_granularity = 8192;"
|
||||
|
||||
cat "$DATA_FILE" | go run $CUR_DIR/02013_zlib_read_after_eof.go
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "SELECT count() FROM graphite;"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "drop table graphite;"
|
Binary file not shown.
Loading…
Reference in New Issue
Block a user