Merge branch 'master' into refactor_statistics

This commit is contained in:
JackyWoo 2024-09-11 11:27:09 +08:00
commit 14ceafb534
37 changed files with 328 additions and 64 deletions

View File

@ -124,6 +124,8 @@ function setup_logs_replication
check_logs_credentials || return 0
__set_connection_args
echo "My hostname is ${HOSTNAME}"
echo 'Create all configured system logs'
clickhouse-client --query "SYSTEM FLUSH LOGS"
@ -184,7 +186,12 @@ function setup_logs_replication
/^TTL /d
')
echo -e "Creating remote destination table ${table}_${hash} with statement:\n${statement}" >&2
echo -e "Creating remote destination table ${table}_${hash} with statement:" >&2
echo "::group::${table}"
# there's the only way big "$statement" can be printed without causing EAGAIN error
# cat: write error: Resource temporarily unavailable
echo "$statement" | cat
echo "::endgroup::"
echo "$statement" | clickhouse-client --database_replicated_initial_query_timeout_sec=10 \
--distributed_ddl_task_timeout=30 --distributed_ddl_output_mode=throw_only_active \

View File

@ -55,10 +55,29 @@ void CompressedWriteBuffer::nextImpl()
out.write(compressed_buffer.data(), compressed_size);
}
/// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer.
if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size)
{
memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size));
BufferBase::set(memory.data(), memory.size(), 0);
}
}
CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_))
void CompressedWriteBuffer::finalizeImpl()
{
/// Don't try to resize buffer in nextImpl.
use_adaptive_buffer_size = false;
next();
}
CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool use_adaptive_buffer_size_, size_t adaptive_buffer_initial_size)
: BufferWithOwnMemory<WriteBuffer>(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size)
, out(out_)
, codec(std::move(codec_))
, use_adaptive_buffer_size(use_adaptive_buffer_size_)
, adaptive_buffer_max_size(buf_size)
{
}

View File

@ -19,7 +19,9 @@ public:
explicit CompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(),
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
~CompressedWriteBuffer() override;
@ -45,10 +47,17 @@ public:
private:
void nextImpl() override;
void finalizeImpl() override;
WriteBuffer & out;
CompressionCodecPtr codec;
/// If true, the size of internal buffer will be exponentially increased up to
/// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid
/// large buffer allocation when actual size of written data is small.
bool use_adaptive_buffer_size;
size_t adaptive_buffer_max_size;
PODArray<char> compressed_buffer;
};

View File

@ -20,6 +20,9 @@ static constexpr auto DBMS_DEFAULT_POLL_INTERVAL = 10;
/// The size of the I/O buffer by default.
static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL;
/// The initial size of adaptive I/O buffer by default.
static constexpr auto DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE = 16384ULL;
static constexpr auto PADDING_FOR_SIMD = 64;
/** Which blocks by default read the data (by number of rows).

View File

@ -148,6 +148,7 @@ namespace DB
M(Bool, storage_metadata_write_full_object_key, false, "Write disk metadata files with VERSION_FULL_OBJECT_KEY format", 0) \
M(UInt64, max_materialized_views_count_for_table, 0, "A limit on the number of materialized views attached to a table.", 0) \
M(UInt32, max_database_replicated_create_table_thread_pool_size, 1, "The number of threads to create tables during replica recovery in DatabaseReplicated. Zero means number of threads equal number of cores.", 0) \
M(Bool, database_replicated_allow_detach_permanently, true, "Allow detaching tables permanently in Replicated databases", 0) \
M(Bool, format_alter_operations_with_parentheses, false, "If enabled, each operation in alter queries will be surrounded with parentheses in formatted queries to make them less ambiguous.", 0) \
M(String, default_replica_path, "/clickhouse/tables/{uuid}/{shard}", "The path to the table in ZooKeeper", 0) \
M(String, default_replica_name, "{replica}", "The replica name in ZooKeeper", 0) \

View File

@ -420,6 +420,21 @@ bool ISerialization::isEphemeralSubcolumn(const DB::ISerialization::SubstreamPat
return path[last_elem].type == Substream::VariantElementNullMap;
}
bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath & path, size_t prefix_len)
{
if (prefix_len == 0 || prefix_len > path.size())
return false;
for (size_t i = 0; i != prefix_len; ++i)
{
if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::DynamicStructure
|| path[i].type == SubstreamType::ObjectData || path[i].type == SubstreamType::ObjectStructure)
return true;
}
return false;
}
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
{
assert(prefix_len <= path.size());

View File

@ -457,6 +457,9 @@ public:
/// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);.
static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len);
/// Returns true if stream with specified path corresponds to dynamic subcolumn.
static bool isDynamicSubcolumn(const SubstreamPath & path, size_t prefix_len);
protected:
template <typename State, typename StatePtr>
State * checkAndGetState(const StatePtr & state) const;

View File

@ -63,6 +63,7 @@ namespace ErrorCodes
extern const int NO_ACTIVE_REPLICAS;
extern const int CANNOT_GET_REPLICATED_DATABASE_SNAPSHOT;
extern const int CANNOT_RESTORE_TABLE;
extern const int SUPPORT_IS_DISABLED;
}
static constexpr const char * REPLICATED_DATABASE_MARK = "DatabaseReplicated";
@ -1741,6 +1742,9 @@ void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const
{
waitDatabaseStarted();
if (!local_context->getServerSettings().database_replicated_allow_detach_permanently)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Support for DETACH TABLE PERMANENTLY is disabled");
auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->isInitialQuery())

View File

@ -339,7 +339,15 @@ DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const
{
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
return std::make_unique<WriteBufferFromFile>(
fs::path(disk_path) / path, buf_size, flags, settings.local_throttler);
fs::path(disk_path) / path,
buf_size,
flags,
settings.local_throttler,
0666,
nullptr,
0,
settings.use_adaptive_write_buffer,
settings.adaptive_write_buffer_initial_size);
}
std::vector<String> DiskLocal::getBlobPath(const String & path) const

View File

@ -59,7 +59,7 @@ WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
const WriteSettings & write_settings_,
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
: WriteBufferFromFileBase(std::min(buf_size_, static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0)
, log(getLogger("WriteBufferFromAzureBlobStorage"))
, buffer_allocation_policy(createBufferAllocationPolicy(*settings_))
, max_single_part_upload_size(settings_->max_single_part_upload_size)
@ -244,11 +244,21 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer()
buffer_allocation_policy->nextBuffer();
chassert(0 == hidden_size);
auto size = buffer_allocation_policy->getBufferSize();
/// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor.
/// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy.
if (buffer_allocation_policy->getBufferNumber() == 1)
size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size);
{
/// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy.
/// Usually it doesn't happen but we have it in unit tests.
if (memory.size() > buffer_allocation_policy->getBufferSize())
{
memory.resize(buffer_allocation_policy->getBufferSize());
WriteBuffer::set(memory.data(), memory.size());
}
return;
}
auto size = buffer_allocation_policy->getBufferSize();
memory = Memory(size);
WriteBuffer::set(memory.data(), memory.size());
}

View File

@ -289,7 +289,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
object.remote_path,
buf_size,
write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size,
patchSettings(write_settings),
settings.get(),
std::move(scheduler));

View File

@ -282,7 +282,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
client.get(),
uri.bucket,
object.remote_path,
buf_size,
write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size,
request_settings,
std::move(blob_storage_log),
attributes,

View File

@ -2,7 +2,7 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Columns/ColumnLowCardinality.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnSparse.h>
namespace DB
{
@ -18,11 +18,6 @@ public:
return std::make_shared<FunctionMaterialize>();
}
bool useDefaultImplementationForNulls() const override
{
return false;
}
/// Get the function name.
String getName() const override
{
@ -34,8 +29,16 @@ public:
return true;
}
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForNothing() const override { return false; }
bool useDefaultImplementationForConstants() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
bool useDefaultImplementationForSparseColumns() const override { return false; }
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
@ -52,7 +55,7 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
return arguments[0].column->convertToFullColumnIfConst();
return recursiveRemoveSparse(arguments[0].column->convertToFullColumnIfConst());
}
bool hasInformationAboutMonotonicity() const override { return true; }

View File

@ -36,7 +36,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss|eos)([.\-][a-z0-9\-.:]+))");
static const RE2 virtual_hosted_style_pattern(R"(([^.]+)\.(s3express[\-a-z0-9]+|s3|cos|obs|.*oss[^\/]*|eos)([.\-][a-z0-9\-.:]+))");
/// Case when AWS Private Link Interface is being used
/// E.g. (bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/bucket-name/key)

View File

@ -32,8 +32,10 @@ WriteBufferFromFile::WriteBufferFromFile(
ThrottlerPtr throttler_,
mode_t mode,
char * existing_memory,
size_t alignment)
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_)
size_t alignment,
bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size)
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_, use_adaptive_buffer_size_, adaptive_buffer_initial_size)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
@ -66,8 +68,10 @@ WriteBufferFromFile::WriteBufferFromFile(
size_t buf_size,
ThrottlerPtr throttler_,
char * existing_memory,
size_t alignment)
: WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name)
size_t alignment,
bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size)
: WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name, use_adaptive_buffer_size_, adaptive_buffer_initial_size)
{
fd_ = -1;
}

View File

@ -36,7 +36,9 @@ public:
ThrottlerPtr throttler_ = {},
mode_t mode = 0666,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
/// Use pre-opened file descriptor.
explicit WriteBufferFromFile(
@ -45,7 +47,9 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
ThrottlerPtr throttler_ = {},
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
~WriteBufferFromFile() override;

View File

@ -83,6 +83,13 @@ void WriteBufferFromFileDescriptor::nextImpl()
ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
/// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer.
if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_max_buffer_size)
{
memory.resize(std::min(memory.size() * 2, adaptive_max_buffer_size));
BufferBase::set(memory.data(), memory.size(), 0);
}
}
/// NOTE: This class can be used as a very low-level building block, for example
@ -94,11 +101,15 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
char * existing_memory,
ThrottlerPtr throttler_,
size_t alignment,
std::string file_name_)
: WriteBufferFromFileBase(buf_size, existing_memory, alignment)
std::string file_name_,
bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size)
: WriteBufferFromFileBase(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size, existing_memory, alignment)
, fd(fd_)
, throttler(throttler_)
, file_name(std::move(file_name_))
, use_adaptive_buffer_size(use_adaptive_buffer_size_)
, adaptive_max_buffer_size(buf_size)
{
}
@ -124,6 +135,7 @@ void WriteBufferFromFileDescriptor::finalizeImpl()
return;
}
use_adaptive_buffer_size = false;
next();
}

View File

@ -18,7 +18,9 @@ public:
char * existing_memory = nullptr,
ThrottlerPtr throttler_ = {},
size_t alignment = 0,
std::string file_name_ = "");
std::string file_name_ = "",
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
/** Could be used before initialization if needed 'fd' was not passed to constructor.
* It's not possible to change 'fd' during work.
@ -56,6 +58,12 @@ protected:
/// If file has name contains filename, otherwise contains string "(fd=...)"
std::string file_name;
/// If true, the size of internal buffer will be exponentially increased up to
/// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid
/// large buffer allocation when actual size of written data is small.
bool use_adaptive_buffer_size;
size_t adaptive_max_buffer_size;
void finalizeImpl() override;
};

View File

@ -95,7 +95,7 @@ WriteBufferFromS3::WriteBufferFromS3(
std::optional<std::map<String, String>> object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
const WriteSettings & write_settings_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
: WriteBufferFromFileBase(std::min(buf_size_, static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0)
, bucket(bucket_)
, key(key_)
, request_settings(request_settings_)
@ -351,9 +351,17 @@ void WriteBufferFromS3::allocateBuffer()
buffer_allocation_policy->nextBuffer();
chassert(0 == hidden_size);
/// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size.
/// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy.
if (buffer_allocation_policy->getBufferNumber() == 1)
{
allocateFirstBuffer();
/// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy.
/// Usually it doesn't happen but we have it in unit tests.
if (memory.size() > buffer_allocation_policy->getBufferSize())
{
memory.resize(buffer_allocation_policy->getBufferSize());
WriteBuffer::set(memory.data(), memory.size());
}
return;
}
@ -361,14 +369,6 @@ void WriteBufferFromS3::allocateBuffer()
WriteBuffer::set(memory.data(), memory.size());
}
void WriteBufferFromS3::allocateFirstBuffer()
{
const auto max_first_buffer = buffer_allocation_policy->getBufferSize();
const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer);
memory = Memory(size);
WriteBuffer::set(memory.data(), memory.size());
}
void WriteBufferFromS3::setFakeBufferWhenPreFinalized()
{
WriteBuffer::set(fake_buffer_when_prefinalized, sizeof(fake_buffer_when_prefinalized));

View File

@ -64,7 +64,6 @@ private:
void reallocateFirstBuffer();
void detachBuffer();
void allocateBuffer();
void allocateFirstBuffer();
void setFakeBufferWhenPreFinalized();
S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data);

View File

@ -54,7 +54,7 @@ inline void WriteBufferValidUTF8::putReplacement()
}
inline void WriteBufferValidUTF8::putValid(char *data, size_t len)
inline void WriteBufferValidUTF8::putValid(const char *data, size_t len)
{
if (len == 0)
return;
@ -149,9 +149,34 @@ void WriteBufferValidUTF8::finalizeImpl()
/// Write all complete sequences from buffer.
nextImpl();
/// If unfinished sequence at end, then write replacement.
/// Handle remaining bytes if we have an incomplete sequence
if (working_buffer.begin() != memory.data())
putReplacement();
{
const char * p = memory.data();
while (p < pos)
{
UInt8 len = length_of_utf8_sequence[static_cast<const unsigned char>(*p)];
if (p + len > pos)
{
/// Incomplete sequence. Skip one byte.
putReplacement();
++p;
}
else if (Poco::UTF8Encoding::isLegal(reinterpret_cast<const unsigned char *>(p), len))
{
/// Valid sequence
putValid(p, len);
p += len;
}
else
{
/// Invalid sequence, skip first byte.
putReplacement();
++p;
}
}
}
}
}

View File

@ -26,7 +26,7 @@ public:
private:
void putReplacement();
void putValid(char * data, size_t len);
void putValid(const char * data, size_t len);
void nextImpl() override;
void finalizeImpl() override;

View File

@ -24,6 +24,9 @@ struct WriteSettings
bool s3_allow_parallel_part_upload = true;
bool azure_allow_parallel_part_upload = true;
bool use_adaptive_write_buffer = false;
size_t adaptive_write_buffer_initial_size = 16 * 1024;
bool operator==(const WriteSettings & other) const = default;
};

View File

@ -204,6 +204,14 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
S3::URI uri("https://bucket-test.cn-beijing-internal.oss-data-acc.aliyuncs.com/cc-2zeh496zqm0g6e09g");
ASSERT_EQ("https://cn-beijing-internal.oss-data-acc.aliyuncs.com", uri.endpoint);
ASSERT_EQ("bucket-test", uri.bucket);
ASSERT_EQ("cc-2zeh496zqm0g6e09g", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
}
TEST(S3UriTest, versionIdChecks)

View File

@ -258,7 +258,7 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
query_context,
query_,
client_info,
priorities.insert(static_cast<int>(settings.priority)),
priorities.insert(settings.priority),
std::move(thread_group),
query_kind,
settings,

View File

@ -31,7 +31,7 @@ namespace DB
class QueryPriorities
{
public:
using Priority = int;
using Priority = size_t;
private:
friend struct Handle;

View File

@ -85,11 +85,11 @@ MergeTreeDataPartWriterOnDisk::Stream<false>::Stream(
marks_file_extension{marks_file_extension_},
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file),
compressor(plain_hashing, compression_codec_, max_compress_block_size_),
compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
compressed_hashing(compressor),
marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
marks_hashing(*marks_file),
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
marks_compressed_hashing(marks_compressor),
compress_marks(MarkType(marks_file_extension).compressed)
{
@ -108,7 +108,7 @@ MergeTreeDataPartWriterOnDisk::Stream<true>::Stream(
data_file_extension{data_file_extension_},
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file),
compressor(plain_hashing, compression_codec_, max_compress_block_size_),
compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
compressed_hashing(compressor),
compress_marks(false)
{

View File

@ -177,6 +177,10 @@ void MergeTreeDataPartWriterWide::addStreams(
if (!max_compress_block_size)
max_compress_block_size = settings.max_compress_block_size;
WriteSettings query_write_settings = settings.query_write_settings;
query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size());
query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size;
column_streams[stream_name] = std::make_unique<Stream<false>>(
stream_name,
data_part_storage,
@ -186,7 +190,7 @@ void MergeTreeDataPartWriterWide::addStreams(
max_compress_block_size,
marks_compression_codec,
settings.marks_compress_block_size,
settings.query_write_settings);
query_write_settings);
full_name_to_stream_name.emplace(full_stream_name, stream_name);
stream_name_to_full_name.emplace(stream_name, full_stream_name);

View File

@ -30,6 +30,8 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size)
, low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0)
, use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization)
, use_adaptive_write_buffer_for_dynamic_subcolumns(storage_settings->use_adaptive_write_buffer_for_dynamic_subcolumns)
, adaptive_write_buffer_initial_size(storage_settings->adaptive_write_buffer_initial_size)
{
}

View File

@ -80,6 +80,8 @@ struct MergeTreeWriterSettings
size_t low_cardinality_max_dictionary_size;
bool low_cardinality_use_single_dictionary_for_part;
bool use_compact_variant_discriminators_serialization;
bool use_adaptive_write_buffer_for_dynamic_subcolumns;
size_t adaptive_write_buffer_initial_size;
};
}

View File

@ -99,6 +99,8 @@ struct Settings;
M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \
M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \
M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \
\
/* Part removal settings. */ \
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \

View File

@ -64,9 +64,7 @@ namespace
void checkAndAdjustSettings(
ObjectStorageQueueSettings & queue_settings,
ASTStorage * engine_args,
bool is_attach,
const LoggerPtr & log)
bool is_attach)
{
if (!is_attach && !queue_settings.mode.changed)
{
@ -85,16 +83,6 @@ namespace
"Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})",
queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms);
}
if (!is_attach && !queue_settings.processing_threads_num.changed)
{
queue_settings.processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"processing_threads_num",
queue_settings.processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", queue_settings.processing_threads_num);
}
}
std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings)
@ -130,7 +118,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTStorage * engine_args,
ASTStorage * /* engine_args */,
LoadingStrictnessLevel mode)
: IStorage(table_id_)
, WithContext(context_)
@ -154,7 +142,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log);
checkAndAdjustSettings(*queue_settings, mode > LoadingStrictnessLevel::CREATE);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);

View File

@ -0,0 +1,50 @@
-- { echoOn }
SELECT dumpColumnStructure(id) FROM sparse_t;
UInt64, Sparse(size = 2, UInt64(size = 2), UInt64(size = 1))
UInt64, Sparse(size = 2, UInt64(size = 2), UInt64(size = 1))
SELECT dumpColumnStructure(materialize(id)) FROM sparse_t;
UInt64, UInt64(size = 2)
UInt64, UInt64(size = 2)
SELECT dumpColumnStructure(u) FROM sparse_t;
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
SELECT dumpColumnStructure(materialize(u)) FROM sparse_t;
UInt64, UInt64(size = 2)
UInt64, UInt64(size = 2)
SELECT dumpColumnStructure(s) FROM sparse_t;
String, Sparse(size = 2, String(size = 2), UInt64(size = 1))
String, Sparse(size = 2, String(size = 2), UInt64(size = 1))
SELECT dumpColumnStructure(materialize(s)) FROM sparse_t;
String, String(size = 2)
String, String(size = 2)
SELECT dumpColumnStructure(arr1) FROM sparse_t;
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
SELECT dumpColumnStructure(materialize(arr1)) FROM sparse_t;
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
SELECT dumpColumnStructure(arr2) FROM sparse_t;
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
SELECT dumpColumnStructure(materialize(arr2)) FROM sparse_t;
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
SELECT dumpColumnStructure(t) FROM sparse_t;
Tuple(a UInt64, s String), Tuple(size = 2, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)), Sparse(size = 2, String(size = 1), UInt64(size = 0)))
Tuple(a UInt64, s String), Tuple(size = 2, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)), Sparse(size = 2, String(size = 1), UInt64(size = 0)))
SELECT dumpColumnStructure(materialize(t)) FROM sparse_t;
Tuple(a UInt64, s String), Tuple(size = 2, UInt64(size = 2), String(size = 2))
Tuple(a UInt64, s String), Tuple(size = 2, UInt64(size = 2), String(size = 2))
SELECT dumpColumnStructure(t.a) FROM sparse_t;
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
SELECT dumpColumnStructure(materialize(t.a)) FROM sparse_t;
UInt64, UInt64(size = 2)
UInt64, UInt64(size = 2)
SELECT dumpColumnStructure(t.s) FROM sparse_t;
String, Sparse(size = 2, String(size = 1), UInt64(size = 0))
String, Sparse(size = 2, String(size = 1), UInt64(size = 0))
SELECT dumpColumnStructure(materialize(t.s)) FROM sparse_t;
String, String(size = 2)
String, String(size = 2)

View File

@ -0,0 +1,52 @@
DROP TABLE IF EXISTS sparse_t;
CREATE TABLE sparse_t (
id UInt64,
u UInt64,
s String,
arr1 Array(String),
arr2 Array(UInt64),
t Tuple(a UInt64, s String))
ENGINE = MergeTree ORDER BY tuple()
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.1;
INSERT INTO sparse_t SELECT
number,
if (number % 2 = 0, number, 0),
if (number % 2 = 0, toString(number), ''),
if (number % 2 = 0, [''], []),
if (number % 2 = 0, [0], []),
(if (number % 2 = 0, number, 0), '')
FROM numbers(2);
-- { echoOn }
SELECT dumpColumnStructure(id) FROM sparse_t;
SELECT dumpColumnStructure(materialize(id)) FROM sparse_t;
SELECT dumpColumnStructure(u) FROM sparse_t;
SELECT dumpColumnStructure(materialize(u)) FROM sparse_t;
SELECT dumpColumnStructure(s) FROM sparse_t;
SELECT dumpColumnStructure(materialize(s)) FROM sparse_t;
SELECT dumpColumnStructure(arr1) FROM sparse_t;
SELECT dumpColumnStructure(materialize(arr1)) FROM sparse_t;
SELECT dumpColumnStructure(arr2) FROM sparse_t;
SELECT dumpColumnStructure(materialize(arr2)) FROM sparse_t;
SELECT dumpColumnStructure(t) FROM sparse_t;
SELECT dumpColumnStructure(materialize(t)) FROM sparse_t;
SELECT dumpColumnStructure(t.a) FROM sparse_t;
SELECT dumpColumnStructure(materialize(t.a)) FROM sparse_t;
SELECT dumpColumnStructure(t.s) FROM sparse_t;
SELECT dumpColumnStructure(materialize(t.s)) FROM sparse_t;
-- { echoOff }
DROP TABLE IF EXISTS sparse_t
;

View File

@ -0,0 +1,16 @@
{
"meta":
[
{
"name": "unhex('f0')",
"type": "String"
}
],
"data":
[
["<22>"]
],
"rows": 1
}

View File

@ -0,0 +1,2 @@
SET output_format_write_statistics = 0;
SELECT unhex('f0') FORMAT JSONCompact;

View File

@ -1,3 +1,4 @@
SYSTEM FLUSH LOGS;
SELECT
is_initial_query,
count() AS c,