mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge branch 'master' of github.com:ClickHouse/ClickHouse into revert-35914-FIPS_compliance
This commit is contained in:
commit
f024956f62
@ -7,4 +7,6 @@ sidebar_label: C++ Client Library
|
||||
|
||||
See README at [clickhouse-cpp](https://github.com/ClickHouse/clickhouse-cpp) repository.
|
||||
|
||||
[Original article](https://clickhouse.com/docs/en/interfaces/cpp/) <!--hide-->
|
||||
# userver Asynchronous Framework
|
||||
|
||||
[userver (beta)](https://github.com/userver-framework/userver) has builtin support for ClickHouse.
|
||||
|
@ -28,6 +28,9 @@ ClickHouse, Inc. does **not** maintain the tools and libraries listed below and
|
||||
- [Kafka](https://kafka.apache.org)
|
||||
- [clickhouse_sinker](https://github.com/housepower/clickhouse_sinker) (uses [Go client](https://github.com/ClickHouse/clickhouse-go/))
|
||||
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
|
||||
- Batch processing
|
||||
- [Spark](https://spark.apache.org)
|
||||
- [spark-clickhouse-connector](https://github.com/housepower/spark-clickhouse-connector)
|
||||
- Stream processing
|
||||
- [Flink](https://flink.apache.org)
|
||||
- [flink-clickhouse-sink](https://github.com/ivi-ru/flink-clickhouse-sink)
|
||||
|
@ -183,6 +183,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
\
|
||||
M(Int64, network_zstd_compression_level, 1, "Allows you to select the level of ZSTD compression.", 0) \
|
||||
\
|
||||
M(Int64, zstd_window_log_max, 0, "Allows you to select the max window log of ZSTD (it will not be used for MergeTree family)", 0) \
|
||||
\
|
||||
M(UInt64, priority, 0, "Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities.", 0) \
|
||||
M(Int64, os_thread_priority, 0, "If non zero - set corresponding 'nice' value for query processing threads. Can be used to adjust query priority for OS scheduler.", 0) \
|
||||
\
|
||||
@ -481,7 +483,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
M(Seconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \
|
||||
M(Bool, materialize_ttl_after_modify, true, "Apply TTL for old data, after ALTER MODIFY TTL query", 0) \
|
||||
M(String, function_implementation, "", "Choose function implementation for specific target or variant (experimental). If empty enable all of them.", 0) \
|
||||
M(Bool, allow_experimental_geo_types, false, "Allow geo data types such as Point, Ring, Polygon, MultiPolygon", 0) \
|
||||
M(Bool, allow_experimental_geo_types, true, "Allow geo data types such as Point, Ring, Polygon, MultiPolygon", 0) \
|
||||
M(Bool, data_type_default_nullable, false, "Data types without NULL or NOT NULL will make Nullable", 0) \
|
||||
M(Bool, cast_keep_nullable, false, "CAST operator keep Nullable for result data type", 0) \
|
||||
M(Bool, cast_ipv4_ipv6_default_on_conversion_error, false, "CAST operator into IPv4, CAST operator into IPV6 type, toIPv4, toIPv6 functions will return default value instead of throwing exception on conversion error.", 0) \
|
||||
@ -494,8 +496,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
|
||||
M(Bool, enable_global_with_statement, true, "Propagate WITH statements to UNION queries and all subqueries", 0) \
|
||||
M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \
|
||||
M(Bool, optimize_syntax_fuse_functions, false, "Allow apply syntax optimisation: fuse aggregate functions", 0) \
|
||||
M(Bool, optimize_fuse_sum_count_avg, false, "Fuse functions `sum, avg, count` with identical arguments into one `sumCount` (`optimize_syntax_fuse_functions should be enabled)", 0) \
|
||||
M(Bool, optimize_syntax_fuse_functions, false, "Not ready for production, do not use. Allow apply syntax optimisation: fuse aggregate functions", 0) \
|
||||
M(Bool, optimize_fuse_sum_count_avg, false, "Not ready for production, do not use. Fuse functions `sum, avg, count` with identical arguments into one `sumCount` (`optimize_syntax_fuse_functions should be enabled)", 0) \
|
||||
M(Bool, flatten_nested, true, "If true, columns of type Nested will be flatten to separate array columns instead of one array of tuples", 0) \
|
||||
M(Bool, asterisk_include_materialized_columns, false, "Include MATERIALIZED columns for wildcard query", 0) \
|
||||
M(Bool, asterisk_include_alias_columns, false, "Include ALIAS columns for wildcard query", 0) \
|
||||
@ -550,7 +552,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
M(UInt64, function_range_max_elements_in_block, 500000000, "Maximum number of values generated by function 'range' per block of data (sum of array sizes for every row in a block, see also 'max_block_size' and 'min_insert_block_size_rows'). It is a safety threshold.", 0) \
|
||||
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
|
||||
\
|
||||
M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
|
||||
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
|
||||
M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
|
||||
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
|
||||
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
|
||||
|
@ -185,8 +185,11 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
}
|
||||
|
||||
prefetch_buffer.swap(memory);
|
||||
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
setWithBytesToIgnore(memory.data(), size, offset);
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -202,7 +205,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
if (size)
|
||||
{
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
setWithBytesToIgnore(memory.data(), size, offset);
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,9 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
|
||||
{
|
||||
prefetch_buffer.swap(memory);
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
setWithBytesToIgnore(memory.data(), size, offset);
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -109,7 +111,9 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
|
||||
if (size)
|
||||
{
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
setWithBytesToIgnore(memory.data(), size, offset);
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -196,7 +200,6 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
|
||||
else if (prefetch_future.valid())
|
||||
{
|
||||
/// Read from prefetch buffer and recheck if the new position is valid inside.
|
||||
|
||||
if (nextImpl())
|
||||
continue;
|
||||
}
|
||||
@ -219,7 +222,8 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
|
||||
file_offset_of_buffer_end = seek_pos;
|
||||
bytes_to_ignore = new_pos - seek_pos;
|
||||
|
||||
assert(bytes_to_ignore < internal_buffer.size());
|
||||
if (bytes_to_ignore >= internal_buffer.size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error in AsynchronousReadBufferFromFileDescriptor, bytes_to_ignore ({}) >= internal_buffer.size() ({})", bytes_to_ignore, internal_buffer.size());
|
||||
|
||||
return seek_pos;
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
|
||||
}
|
||||
|
||||
static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment, int zstd_window_log_max)
|
||||
{
|
||||
if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib)
|
||||
return std::make_unique<ZlibInflatingReadBuffer>(std::move(nested), method, buf_size, existing_memory, alignment);
|
||||
@ -110,7 +110,7 @@ static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
|
||||
if (method == CompressionMethod::Xz)
|
||||
return std::make_unique<LZMAInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
|
||||
if (method == CompressionMethod::Zstd)
|
||||
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
|
||||
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment, zstd_window_log_max);
|
||||
if (method == CompressionMethod::Lz4)
|
||||
return std::make_unique<Lz4InflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
|
||||
#if USE_BZIP2
|
||||
@ -126,14 +126,13 @@ static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, int zstd_window_log_max, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
{
|
||||
if (method == CompressionMethod::None)
|
||||
return nested;
|
||||
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment);
|
||||
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment, zstd_window_log_max);
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
|
||||
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
{
|
||||
|
@ -5,7 +5,6 @@
|
||||
|
||||
#include <Core/Defines.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ReadBuffer;
|
||||
@ -50,10 +49,12 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
|
||||
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
|
||||
std::unique_ptr<ReadBuffer> nested,
|
||||
CompressionMethod method,
|
||||
int zstd_window_log_max = 0,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0);
|
||||
|
||||
|
||||
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
|
||||
std::unique_ptr<WriteBuffer> nested,
|
||||
CompressionMethod method,
|
||||
|
@ -52,29 +52,6 @@ public:
|
||||
// FIXME: behavior differs greately from `BufferBase::set()` and it's very confusing.
|
||||
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); working_buffer.resize(0); }
|
||||
|
||||
/// Set buffer to given piece of memory but with certain bytes ignored from beginning.
|
||||
///
|
||||
/// internal_buffer: |__________________|
|
||||
/// working_buffer: |xxxxx|____________|
|
||||
/// ^ ^
|
||||
/// bytes_to_ignore
|
||||
///
|
||||
/// It's used for lazy seek. We also have another lazy seek mechanism that uses
|
||||
/// `nextimpl_working_buffer_offset` to set offset in `next` method. It's important that we
|
||||
/// don't do double lazy seek, which means `nextimpl_working_buffer_offset` should be zero. It's
|
||||
/// useful to keep internal_buffer points to the real span of the underlying memory, because its
|
||||
/// size might be used to allocate other buffers. It's also important to have pos starts at
|
||||
/// working_buffer.begin(), because some buffers assume this condition to be true and uses
|
||||
/// offset() to check read bytes.
|
||||
void setWithBytesToIgnore(Position ptr, size_t size, size_t bytes_to_ignore)
|
||||
{
|
||||
assert(bytes_to_ignore < size);
|
||||
assert(nextimpl_working_buffer_offset == 0);
|
||||
internal_buffer = Buffer(ptr, ptr + size);
|
||||
working_buffer = Buffer(ptr + bytes_to_ignore, ptr + size);
|
||||
pos = ptr + bytes_to_ignore;
|
||||
}
|
||||
|
||||
/** read next data and fill a buffer with it; set position to the beginning;
|
||||
* return `false` in case of end, `true` otherwise; throw an exception, if something is wrong
|
||||
*/
|
||||
|
@ -294,7 +294,7 @@ void WriteBufferFromS3::writePart()
|
||||
++num_finished_bg_tasks;
|
||||
|
||||
/// Notification under mutex is important here.
|
||||
/// Othervies, WriteBuffer could be destroyed in between
|
||||
/// Otherwise, WriteBuffer could be destroyed in between
|
||||
/// Releasing lock and condvar notification.
|
||||
bg_tasks_condvar.notify_one();
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ namespace ErrorCodes
|
||||
extern const int ZSTD_DECODER_FAILED;
|
||||
}
|
||||
|
||||
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment, int zstd_window_log_max)
|
||||
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
|
||||
{
|
||||
dctx = ZSTD_createDCtx();
|
||||
@ -19,6 +19,12 @@ ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_
|
||||
{
|
||||
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
|
||||
}
|
||||
|
||||
size_t ret = ZSTD_DCtx_setParameter(dctx, ZSTD_d_windowLogMax, zstd_window_log_max);
|
||||
if (ZSTD_isError(ret))
|
||||
{
|
||||
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: {}", ZSTD_getErrorName(ret));
|
||||
}
|
||||
}
|
||||
|
||||
ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer()
|
||||
|
@ -20,7 +20,8 @@ public:
|
||||
std::unique_ptr<ReadBuffer> in_,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0);
|
||||
size_t alignment = 0,
|
||||
int zstd_window_log_max = 0);
|
||||
|
||||
~ZstdInflatingReadBuffer() override;
|
||||
|
||||
|
@ -47,10 +47,26 @@ RemoteInserter::RemoteInserter(
|
||||
}
|
||||
}
|
||||
|
||||
Settings settings = settings_;
|
||||
/// With current protocol it is impossible to avoid deadlock in case of send_logs_level!=none.
|
||||
///
|
||||
/// RemoteInserter send Data blocks/packets to the remote shard,
|
||||
/// while remote side can send Log packets to the initiator (this RemoteInserter instance).
|
||||
///
|
||||
/// But it is not enough to pull Log packets just before writing the next block
|
||||
/// since there is no way to ensure that all Log packets had been consumed.
|
||||
///
|
||||
/// And if enough Log packets will be queued by the remote side,
|
||||
/// it will wait send_timeout until initiator will consume those packets,
|
||||
/// while initiator already starts writing Data blocks,
|
||||
/// and will not consume Log packets.
|
||||
///
|
||||
/// So that is why send_logs_level had been disabled here.
|
||||
settings.send_logs_level = "none";
|
||||
/** Send query and receive "header", that describes table structure.
|
||||
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
|
||||
*/
|
||||
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info, false, {});
|
||||
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, {});
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -649,7 +649,7 @@ void HTTPHandler::processQuery(
|
||||
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
|
||||
String http_request_compression_method_str = request.get("Content-Encoding", "");
|
||||
auto in_post = wrapReadBufferWithCompressionMethod(
|
||||
wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str));
|
||||
wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str), context->getSettingsRef().zstd_window_log_max);
|
||||
|
||||
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
|
||||
/// 'decompress' query parameter.
|
||||
|
@ -134,7 +134,9 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
|
||||
prefetch_buffer.swap(memory);
|
||||
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
setWithBytesToIgnore(memory.data(), size, offset);
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -150,7 +152,9 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
|
||||
if (size)
|
||||
{
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
setWithBytesToIgnore(memory.data(), size, offset);
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,8 +197,9 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
if (it == paths.end())
|
||||
return nullptr;
|
||||
auto compression = chooseCompressionMethod(*it, compression_method);
|
||||
auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
|
||||
return wrapReadBufferWithCompressionMethod(
|
||||
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression);
|
||||
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression, zstd_window_log_max);
|
||||
};
|
||||
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
|
||||
}
|
||||
@ -327,7 +328,8 @@ bool HDFSSource::initialize()
|
||||
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
|
||||
|
||||
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
|
||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
|
||||
const auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
|
||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression, zstd_window_log_max);
|
||||
|
||||
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);
|
||||
|
||||
|
@ -208,7 +208,8 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
|
||||
in.setProgressCallback(context);
|
||||
}
|
||||
|
||||
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
|
||||
auto zstd_window_log_max = context->getSettingsRef().zstd_window_log_max;
|
||||
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method, zstd_window_log_max);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -434,7 +434,8 @@ bool StorageS3Source::initialize()
|
||||
|
||||
file_path = fs::path(bucket) / current_key;
|
||||
|
||||
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint));
|
||||
auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
|
||||
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint), zstd_window_log_max);
|
||||
|
||||
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
|
||||
QueryPipelineBuilder builder;
|
||||
@ -1170,10 +1171,12 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
read_keys_in_distributed_processing->push_back(key);
|
||||
|
||||
first = false;
|
||||
const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
|
||||
return wrapReadBufferWithCompressionMethod(
|
||||
std::make_unique<ReadBufferFromS3>(
|
||||
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
|
||||
chooseCompressionMethod(key, compression_method));
|
||||
chooseCompressionMethod(key, compression_method),
|
||||
zstd_window_log_max);
|
||||
};
|
||||
|
||||
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
|
||||
|
@ -350,7 +350,8 @@ namespace
|
||||
std::move(read_buffer_factory),
|
||||
threadPoolCallbackRunner(IOThreadPool::get()),
|
||||
download_threads),
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method));
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method),
|
||||
settings.zstd_window_log_max);
|
||||
}
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
@ -381,7 +382,8 @@ namespace
|
||||
delay_initialization,
|
||||
/* use_external_buffer */ false,
|
||||
/* skip_url_not_found_error */ skip_url_not_found_error),
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method));
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method),
|
||||
settings.zstd_window_log_max);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -1,3 +1,4 @@
|
||||
-- Tags: distributed
|
||||
|
||||
SELECT quantilesTiming(0.1, 0.5, 0.9)(dummy) FROM remote('127.0.0.{2,3}', system, one) GROUP BY 1 WITH TOTALS
|
||||
SET enable_positional_arguments = 0;
|
||||
SELECT quantilesTiming(0.1, 0.5, 0.9)(dummy) FROM remote('127.0.0.{2,3}', system, one) GROUP BY 1 WITH TOTALS;
|
||||
|
@ -1,6 +1,8 @@
|
||||
DROP TABLE IF EXISTS test_00209;
|
||||
CREATE TABLE test_00209 (x UInt8) ENGINE = Log;
|
||||
|
||||
SET enable_positional_arguments = 0;
|
||||
|
||||
INSERT INTO test_00209 SELECT 1 AS x;
|
||||
INSERT INTO test_00209 SELECT 1 AS x SETTINGS extremes = 1;
|
||||
INSERT INTO test_00209 SELECT 1 AS x GROUP BY 1 WITH TOTALS;
|
||||
|
@ -28,15 +28,15 @@ do
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE test ENGINE=$engine AS SELECT number + 100 AS n, 0 AS test FROM numbers(50)" 2>&1| grep -Ev "Removing leftovers from table|removed by another replica"
|
||||
$CLICKHOUSE_CLIENT -q "select count(), sum(n), sum(test) from test"
|
||||
if [[ $engine == *"ReplicatedMergeTree"* ]]; then
|
||||
$CLICKHOUSE_CLIENT -q "ALTER TABLE test
|
||||
$CLICKHOUSE_CLIENT --enable_positional_arguments 0 -q "ALTER TABLE test
|
||||
UPDATE test = (SELECT groupArray(id) FROM t1 GROUP BY 1)[n - 99] WHERE 1" 2>&1| grep -Fa "DB::Exception: " | grep -Fv "statement with subquery may be nondeterministic"
|
||||
$CLICKHOUSE_CLIENT --allow_nondeterministic_mutations=1 --mutations_sync=1 -q "ALTER TABLE test
|
||||
$CLICKHOUSE_CLIENT --enable_positional_arguments 0 --allow_nondeterministic_mutations=1 --mutations_sync=1 -q "ALTER TABLE test
|
||||
UPDATE test = (SELECT groupArray(id) FROM t1 GROUP BY 1)[n - 99] WHERE 1"
|
||||
elif [[ $engine == *"Join"* ]]; then
|
||||
$CLICKHOUSE_CLIENT -q "ALTER TABLE test
|
||||
$CLICKHOUSE_CLIENT --enable_positional_arguments 0 -q "ALTER TABLE test
|
||||
UPDATE test = (SELECT groupArray(id) FROM t1 GROUP BY 1)[n - 99] WHERE 1" 2>&1| grep -Fa "DB::Exception: " | grep -Fv "Table engine Join supports only DELETE mutations"
|
||||
else
|
||||
$CLICKHOUSE_CLIENT --mutations_sync=1 -q "ALTER TABLE test
|
||||
$CLICKHOUSE_CLIENT --enable_positional_arguments 0 --mutations_sync=1 -q "ALTER TABLE test
|
||||
UPDATE test = (SELECT groupArray(id) FROM t1 GROUP BY 1)[n - 99] WHERE 1"
|
||||
fi
|
||||
$CLICKHOUSE_CLIENT -q "select count(), sum(n), sum(test) from test"
|
||||
|
@ -1,6 +1,7 @@
|
||||
DROP TABLE IF EXISTS data_01283;
|
||||
|
||||
set remote_filesystem_read_method='read';
|
||||
set remote_filesystem_read_method = 'read';
|
||||
set local_filesystem_read_method = 'pread';
|
||||
|
||||
CREATE TABLE data_01283 engine=MergeTree()
|
||||
ORDER BY key
|
||||
|
@ -1,6 +1,7 @@
|
||||
drop table if exists table_01323_many_parts;
|
||||
|
||||
set remote_filesystem_read_method='read';
|
||||
set remote_filesystem_read_method = 'read';
|
||||
set local_filesystem_read_method = 'pread';
|
||||
|
||||
create table table_01323_many_parts (x UInt64) engine = MergeTree order by x partition by x % 100;
|
||||
set max_partitions_per_insert_block = 100;
|
||||
|
@ -32,6 +32,7 @@ INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(50
|
||||
OPTIMIZE TABLE select_final FINAL;
|
||||
|
||||
SET remote_filesystem_read_method = 'read';
|
||||
SET local_filesystem_read_method = 'pread';
|
||||
|
||||
SELECT max(x) FROM select_final FINAL;
|
||||
|
||||
|
@ -4,7 +4,9 @@ drop table if exists data_01641;
|
||||
|
||||
-- Disable cache for s3 storage tests because it increases memory usage.
|
||||
set enable_filesystem_cache=0;
|
||||
set remote_filesystem_read_method='read';
|
||||
|
||||
set remote_filesystem_read_method = 'read';
|
||||
set local_filesystem_read_method = 'pread';
|
||||
|
||||
create table data_01641 (key Int, value String) engine=MergeTree order by (key, repeat(value, 40)) settings old_parts_lifetime=0, min_bytes_for_wide_part=0;
|
||||
|
||||
|
@ -40,6 +40,7 @@ select toStartOfMinute(datetime) dt_m, domain, sum(retry_count) / sum(duration),
|
||||
select toStartOfHour(toStartOfMinute(datetime)) dt_h, uniqHLL12(x_id), uniqHLL12(y_id) from projection_test group by dt_h order by dt_h;
|
||||
|
||||
-- found by fuzzer
|
||||
SET enable_positional_arguments = 0;
|
||||
SELECT 2, -1 FROM projection_test PREWHERE domain_alias = 1. WHERE domain = NULL GROUP BY -9223372036854775808 ORDER BY countIf(first_time = 0) / count(-2147483649) DESC NULLS LAST, 1048576 DESC NULLS LAST;
|
||||
|
||||
drop table if exists projection_test;
|
||||
|
@ -1,3 +1,5 @@
|
||||
-- Tags: no-backward-compatibility-check:22.5.1
|
||||
|
||||
DROP TABLE IF EXISTS partslost_0;
|
||||
DROP TABLE IF EXISTS partslost_1;
|
||||
DROP TABLE IF EXISTS partslost_2;
|
||||
|
@ -23,7 +23,8 @@ AND current_database = currentDatabase()
|
||||
ORDER BY query_start_time DESC
|
||||
LIMIT 1;
|
||||
|
||||
SET remote_filesystem_read_method='read';
|
||||
set remote_filesystem_read_method = 'read';
|
||||
set local_filesystem_read_method = 'pread';
|
||||
|
||||
SELECT 2, * FROM test LIMIT 10 FORMAT Null;
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
0
|
56
tests/queries/0_stateless/02232_dist_insert_send_logs_level_hung.sh
Executable file
56
tests/queries/0_stateless/02232_dist_insert_send_logs_level_hung.sh
Executable file
@ -0,0 +1,56 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, no-parallel, no-backward-compatibility-check
|
||||
# Tag: no-parallel - to heavy
|
||||
# Tag: long - to heavy
|
||||
|
||||
# This is the regression test when remote peer send some logs for INSERT,
|
||||
# it is easy to archive using materialized views, with small block size.
|
||||
|
||||
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=trace
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
# NOTE: that since we use CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL we need to apply
|
||||
# --server_logs_file for every clickhouse-client invocation.
|
||||
client_opts=(
|
||||
# For --send_logs_level see $CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL
|
||||
--server_logs_file /dev/null
|
||||
# we need lots of blocks to get log entry for each of them
|
||||
--min_insert_block_size_rows 1
|
||||
# we need to terminate ASAP
|
||||
--max_block_size 1
|
||||
)
|
||||
|
||||
$CLICKHOUSE_CLIENT "${client_opts[@]}" -nm -q "
|
||||
drop table if exists mv_02232;
|
||||
drop table if exists in_02232;
|
||||
drop table if exists out_02232;
|
||||
|
||||
create table out_02232 (key Int) engine=Null();
|
||||
create table in_02232 (key Int) engine=Null();
|
||||
create materialized view mv_02232 to out_02232 as select * from in_02232;
|
||||
"
|
||||
|
||||
insert_client_opts=(
|
||||
# Increase timeouts to avoid timeout during trying to send Log packet to
|
||||
# the remote side, when the socket is full.
|
||||
--send_timeout 86400
|
||||
--receive_timeout 86400
|
||||
)
|
||||
# 250 seconds is enough to trigger the query hung (even in debug build)
|
||||
#
|
||||
# NOTE: using proper termination (via SIGINT) is too long,
|
||||
# hence timeout+KILL QUERY.
|
||||
timeout 250s $CLICKHOUSE_CLIENT "${client_opts[@]}" "${insert_client_opts[@]}" -q "insert into function remote('127.2', currentDatabase(), in_02232) select * from numbers(1e6)"
|
||||
|
||||
# Kill underlying query of remote() to make KILL faster
|
||||
timeout 30s $CLICKHOUSE_CLIENT "${client_opts[@]}" -q "KILL QUERY WHERE Settings['log_comment'] = '$CLICKHOUSE_LOG_COMMENT' SYNC" --format Null
|
||||
echo $?
|
||||
|
||||
$CLICKHOUSE_CLIENT "${client_opts[@]}" -nm -q "
|
||||
drop table in_02232;
|
||||
drop table mv_02232;
|
||||
drop table out_02232;
|
||||
"
|
@ -0,0 +1,2 @@
|
||||
1
|
||||
40
|
10
tests/queries/0_stateless/02293_test_zstd_window_log_max.sh
Executable file
10
tests/queries/0_stateless/02293_test_zstd_window_log_max.sh
Executable file
@ -0,0 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-parallel
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
# reuse the test data in 01946_test_zstd_decompression_with_escape_sequence_at_the_end_of_buffer.sh
|
||||
$CLICKHOUSE_LOCAL --query "SELECT count() FROM file('$CUR_DIR/data_zstd/test_01946.zstd', JSONEachRow, 'foo String') SETTINGS zstd_window_log_max = 20" 2>&1 | grep -c "ZSTD_DECODER_FAILED"
|
||||
$CLICKHOUSE_LOCAL --query "SELECT count() FROM file('$CUR_DIR/data_zstd/test_01946.zstd', JSONEachRow, 'foo String') SETTINGS zstd_window_log_max = 21"
|
16
tests/queries/0_stateless/02332_dist_insert_send_logs_level.sh
Executable file
16
tests/queries/0_stateless/02332_dist_insert_send_logs_level.sh
Executable file
@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-backward-compatibility-check
|
||||
|
||||
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=trace
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --server_logs_file /dev/null -q "CREATE TABLE data_02332 (key Int) Engine=Null()"
|
||||
# If ClickHouse server will forward logs from the remote nodes, than it will definitely will have the following message in the log:
|
||||
#
|
||||
# <Debug> executeQuery: (from 127.0.0.1:53440, initial_query_id: fc1f7dbd-845b-4142-9306-158ddd564e61) INSERT INTO default.data (key) VALUES (stage: Complete)
|
||||
#
|
||||
# And if the server will forward logs, then the query may hung.
|
||||
$CLICKHOUSE_CLIENT -q "INSERT INTO FUNCTION remote('127.2', currentDatabase(), data_02332) SELECT * FROM numbers(10)" |& grep 'executeQuery.*initial_query_id.*INSERT INTO'
|
||||
exit 0
|
@ -462,6 +462,7 @@ unencrypted
|
||||
unixodbc
|
||||
url
|
||||
userspace
|
||||
userver
|
||||
utils
|
||||
uuid
|
||||
variadic
|
||||
|
Loading…
Reference in New Issue
Block a user