mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge remote-tracking branch 'origin/master' into pr-local-plan
This commit is contained in:
commit
c3dfdd41d3
2
contrib/libarchive
vendored
2
contrib/libarchive
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 0c21691b177fac5f4cceca2a1ff2ddfa5d60f51c
|
Subproject commit 313aa1fa10b657de791e3202c168a6c833bc3543
|
@ -157,7 +157,7 @@ if (TARGET ch_contrib::zlib)
|
|||||||
endif()
|
endif()
|
||||||
|
|
||||||
if (TARGET ch_contrib::zstd)
|
if (TARGET ch_contrib::zstd)
|
||||||
target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1 HAVE_LIBZSTD_COMPRESSOR=1)
|
target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1 HAVE_ZSTD_compressStream=1)
|
||||||
target_link_libraries(_libarchive PRIVATE ch_contrib::zstd)
|
target_link_libraries(_libarchive PRIVATE ch_contrib::zstd)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
@ -334,13 +334,16 @@ typedef uint64_t uintmax_t;
|
|||||||
/* #undef ARCHIVE_XATTR_LINUX */
|
/* #undef ARCHIVE_XATTR_LINUX */
|
||||||
|
|
||||||
/* Version number of bsdcpio */
|
/* Version number of bsdcpio */
|
||||||
#define BSDCPIO_VERSION_STRING "3.7.0"
|
#define BSDCPIO_VERSION_STRING "3.7.4"
|
||||||
|
|
||||||
/* Version number of bsdtar */
|
/* Version number of bsdtar */
|
||||||
#define BSDTAR_VERSION_STRING "3.7.0"
|
#define BSDTAR_VERSION_STRING "3.7.4"
|
||||||
|
|
||||||
/* Version number of bsdcat */
|
/* Version number of bsdcat */
|
||||||
#define BSDCAT_VERSION_STRING "3.7.0"
|
#define BSDCAT_VERSION_STRING "3.7.4"
|
||||||
|
|
||||||
|
/* Version number of bsdunzip */
|
||||||
|
#define BSDUNZIP_VERSION_STRING "3.7.4"
|
||||||
|
|
||||||
/* Define to 1 if you have the `acl_create_entry' function. */
|
/* Define to 1 if you have the `acl_create_entry' function. */
|
||||||
/* #undef HAVE_ACL_CREATE_ENTRY */
|
/* #undef HAVE_ACL_CREATE_ENTRY */
|
||||||
@ -642,8 +645,8 @@ typedef uint64_t uintmax_t;
|
|||||||
/* Define to 1 if you have the `getgrnam_r' function. */
|
/* Define to 1 if you have the `getgrnam_r' function. */
|
||||||
#define HAVE_GETGRNAM_R 1
|
#define HAVE_GETGRNAM_R 1
|
||||||
|
|
||||||
/* Define to 1 if platform uses `optreset` to reset `getopt` */
|
/* Define to 1 if you have the `getline' function. */
|
||||||
#define HAVE_GETOPT_OPTRESET 1
|
#define HAVE_GETLINE 1
|
||||||
|
|
||||||
/* Define to 1 if you have the `getpid' function. */
|
/* Define to 1 if you have the `getpid' function. */
|
||||||
#define HAVE_GETPID 1
|
#define HAVE_GETPID 1
|
||||||
@ -750,6 +753,12 @@ typedef uint64_t uintmax_t;
|
|||||||
/* Define to 1 if you have the `pcreposix' library (-lpcreposix). */
|
/* Define to 1 if you have the `pcreposix' library (-lpcreposix). */
|
||||||
/* #undef HAVE_LIBPCREPOSIX */
|
/* #undef HAVE_LIBPCREPOSIX */
|
||||||
|
|
||||||
|
/* Define to 1 if you have the `pcre2-8' library (-lpcre2-8). */
|
||||||
|
/* #undef HAVE_LIBPCRE2 */
|
||||||
|
|
||||||
|
/* Define to 1 if you have the `pcreposix' library (-lpcre2posix). */
|
||||||
|
/* #undef HAVE_LIBPCRE2POSIX */
|
||||||
|
|
||||||
/* Define to 1 if you have the `xml2' library (-lxml2). */
|
/* Define to 1 if you have the `xml2' library (-lxml2). */
|
||||||
#define HAVE_LIBXML2 1
|
#define HAVE_LIBXML2 1
|
||||||
|
|
||||||
@ -765,9 +774,8 @@ typedef uint64_t uintmax_t;
|
|||||||
/* Define to 1 if you have the `zstd' library (-lzstd). */
|
/* Define to 1 if you have the `zstd' library (-lzstd). */
|
||||||
/* #undef HAVE_LIBZSTD */
|
/* #undef HAVE_LIBZSTD */
|
||||||
|
|
||||||
/* Define to 1 if you have the `zstd' library (-lzstd) with compression
|
/* Define to 1 if you have the ZSTD_compressStream function. */
|
||||||
support. */
|
/* #undef HAVE_ZSTD_compressStream */
|
||||||
/* #undef HAVE_LIBZSTD_COMPRESSOR */
|
|
||||||
|
|
||||||
/* Define to 1 if you have the <limits.h> header file. */
|
/* Define to 1 if you have the <limits.h> header file. */
|
||||||
#define HAVE_LIMITS_H 1
|
#define HAVE_LIMITS_H 1
|
||||||
@ -923,6 +931,9 @@ typedef uint64_t uintmax_t;
|
|||||||
/* Define to 1 if you have the <pcreposix.h> header file. */
|
/* Define to 1 if you have the <pcreposix.h> header file. */
|
||||||
/* #undef HAVE_PCREPOSIX_H */
|
/* #undef HAVE_PCREPOSIX_H */
|
||||||
|
|
||||||
|
/* Define to 1 if you have the <pcre2posix.h> header file. */
|
||||||
|
/* #undef HAVE_PCRE2POSIX_H */
|
||||||
|
|
||||||
/* Define to 1 if you have the `pipe' function. */
|
/* Define to 1 if you have the `pipe' function. */
|
||||||
#define HAVE_PIPE 1
|
#define HAVE_PIPE 1
|
||||||
|
|
||||||
@ -1029,6 +1040,12 @@ typedef uint64_t uintmax_t;
|
|||||||
/* Define to 1 if you have the `strrchr' function. */
|
/* Define to 1 if you have the `strrchr' function. */
|
||||||
#define HAVE_STRRCHR 1
|
#define HAVE_STRRCHR 1
|
||||||
|
|
||||||
|
/* Define to 1 if the system has the type `struct statfs'. */
|
||||||
|
/* #undef HAVE_STRUCT_STATFS */
|
||||||
|
|
||||||
|
/* Define to 1 if `f_iosize' is a member of `struct statfs'. */
|
||||||
|
/* #undef HAVE_STRUCT_STATFS_F_IOSIZE */
|
||||||
|
|
||||||
/* Define to 1 if `f_namemax' is a member of `struct statfs'. */
|
/* Define to 1 if `f_namemax' is a member of `struct statfs'. */
|
||||||
/* #undef HAVE_STRUCT_STATFS_F_NAMEMAX */
|
/* #undef HAVE_STRUCT_STATFS_F_NAMEMAX */
|
||||||
|
|
||||||
@ -1077,6 +1094,9 @@ typedef uint64_t uintmax_t;
|
|||||||
/* Define to 1 if you have the `symlink' function. */
|
/* Define to 1 if you have the `symlink' function. */
|
||||||
#define HAVE_SYMLINK 1
|
#define HAVE_SYMLINK 1
|
||||||
|
|
||||||
|
/* Define to 1 if you have the `sysconf' function. */
|
||||||
|
#define HAVE_SYSCONF 1
|
||||||
|
|
||||||
/* Define to 1 if you have the <sys/acl.h> header file. */
|
/* Define to 1 if you have the <sys/acl.h> header file. */
|
||||||
/* #undef HAVE_SYS_ACL_H */
|
/* #undef HAVE_SYS_ACL_H */
|
||||||
|
|
||||||
@ -1276,10 +1296,10 @@ typedef uint64_t uintmax_t;
|
|||||||
#define ICONV_CONST
|
#define ICONV_CONST
|
||||||
|
|
||||||
/* Version number of libarchive as a single integer */
|
/* Version number of libarchive as a single integer */
|
||||||
#define LIBARCHIVE_VERSION_NUMBER "3007000"
|
#define LIBARCHIVE_VERSION_NUMBER "3007004"
|
||||||
|
|
||||||
/* Version number of libarchive */
|
/* Version number of libarchive */
|
||||||
#define LIBARCHIVE_VERSION_STRING "3.7.0"
|
#define LIBARCHIVE_VERSION_STRING "3.7.4"
|
||||||
|
|
||||||
/* Define to 1 if `lstat' dereferences a symlink specified with a trailing
|
/* Define to 1 if `lstat' dereferences a symlink specified with a trailing
|
||||||
slash. */
|
slash. */
|
||||||
@ -1333,7 +1353,7 @@ typedef uint64_t uintmax_t;
|
|||||||
#endif /* SAFE_TO_DEFINE_EXTENSIONS */
|
#endif /* SAFE_TO_DEFINE_EXTENSIONS */
|
||||||
|
|
||||||
/* Version number of package */
|
/* Version number of package */
|
||||||
#define VERSION "3.7.0"
|
#define VERSION "3.7.4"
|
||||||
|
|
||||||
/* Number of bits in a file offset, on hosts where this is settable. */
|
/* Number of bits in a file offset, on hosts where this is settable. */
|
||||||
/* #undef _FILE_OFFSET_BITS */
|
/* #undef _FILE_OFFSET_BITS */
|
||||||
|
@ -124,6 +124,8 @@ function setup_logs_replication
|
|||||||
check_logs_credentials || return 0
|
check_logs_credentials || return 0
|
||||||
__set_connection_args
|
__set_connection_args
|
||||||
|
|
||||||
|
echo "My hostname is ${HOSTNAME}"
|
||||||
|
|
||||||
echo 'Create all configured system logs'
|
echo 'Create all configured system logs'
|
||||||
clickhouse-client --query "SYSTEM FLUSH LOGS"
|
clickhouse-client --query "SYSTEM FLUSH LOGS"
|
||||||
|
|
||||||
@ -184,7 +186,12 @@ function setup_logs_replication
|
|||||||
/^TTL /d
|
/^TTL /d
|
||||||
')
|
')
|
||||||
|
|
||||||
echo -e "Creating remote destination table ${table}_${hash} with statement:\n${statement}" >&2
|
echo -e "Creating remote destination table ${table}_${hash} with statement:" >&2
|
||||||
|
echo "::group::${table}"
|
||||||
|
# there's the only way big "$statement" can be printed without causing EAGAIN error
|
||||||
|
# cat: write error: Resource temporarily unavailable
|
||||||
|
echo "$statement" | cat
|
||||||
|
echo "::endgroup::"
|
||||||
|
|
||||||
echo "$statement" | clickhouse-client --database_replicated_initial_query_timeout_sec=10 \
|
echo "$statement" | clickhouse-client --database_replicated_initial_query_timeout_sec=10 \
|
||||||
--distributed_ddl_task_timeout=30 --distributed_ddl_output_mode=throw_only_active \
|
--distributed_ddl_task_timeout=30 --distributed_ddl_output_mode=throw_only_active \
|
||||||
|
@ -55,10 +55,29 @@ void CompressedWriteBuffer::nextImpl()
|
|||||||
|
|
||||||
out.write(compressed_buffer.data(), compressed_size);
|
out.write(compressed_buffer.data(), compressed_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer.
|
||||||
|
if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size)
|
||||||
|
{
|
||||||
|
memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size));
|
||||||
|
BufferBase::set(memory.data(), memory.size(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size)
|
void CompressedWriteBuffer::finalizeImpl()
|
||||||
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_))
|
{
|
||||||
|
/// Don't try to resize buffer in nextImpl.
|
||||||
|
use_adaptive_buffer_size = false;
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
|
||||||
|
CompressedWriteBuffer::CompressedWriteBuffer(
|
||||||
|
WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool use_adaptive_buffer_size_, size_t adaptive_buffer_initial_size)
|
||||||
|
: BufferWithOwnMemory<WriteBuffer>(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size)
|
||||||
|
, out(out_)
|
||||||
|
, codec(std::move(codec_))
|
||||||
|
, use_adaptive_buffer_size(use_adaptive_buffer_size_)
|
||||||
|
, adaptive_buffer_max_size(buf_size)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,7 +19,9 @@ public:
|
|||||||
explicit CompressedWriteBuffer(
|
explicit CompressedWriteBuffer(
|
||||||
WriteBuffer & out_,
|
WriteBuffer & out_,
|
||||||
CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(),
|
CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(),
|
||||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
|
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||||
|
bool use_adaptive_buffer_size_ = false,
|
||||||
|
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
|
||||||
|
|
||||||
~CompressedWriteBuffer() override;
|
~CompressedWriteBuffer() override;
|
||||||
|
|
||||||
@ -45,10 +47,17 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
void nextImpl() override;
|
void nextImpl() override;
|
||||||
|
void finalizeImpl() override;
|
||||||
|
|
||||||
WriteBuffer & out;
|
WriteBuffer & out;
|
||||||
CompressionCodecPtr codec;
|
CompressionCodecPtr codec;
|
||||||
|
|
||||||
|
/// If true, the size of internal buffer will be exponentially increased up to
|
||||||
|
/// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid
|
||||||
|
/// large buffer allocation when actual size of written data is small.
|
||||||
|
bool use_adaptive_buffer_size;
|
||||||
|
size_t adaptive_buffer_max_size;
|
||||||
|
|
||||||
PODArray<char> compressed_buffer;
|
PODArray<char> compressed_buffer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -20,6 +20,9 @@ static constexpr auto DBMS_DEFAULT_POLL_INTERVAL = 10;
|
|||||||
/// The size of the I/O buffer by default.
|
/// The size of the I/O buffer by default.
|
||||||
static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL;
|
static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL;
|
||||||
|
|
||||||
|
/// The initial size of adaptive I/O buffer by default.
|
||||||
|
static constexpr auto DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE = 16384ULL;
|
||||||
|
|
||||||
static constexpr auto PADDING_FOR_SIMD = 64;
|
static constexpr auto PADDING_FOR_SIMD = 64;
|
||||||
|
|
||||||
/** Which blocks by default read the data (by number of rows).
|
/** Which blocks by default read the data (by number of rows).
|
||||||
|
@ -84,7 +84,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
|||||||
},
|
},
|
||||||
{"24.8",
|
{"24.8",
|
||||||
{
|
{
|
||||||
{"rows_before_aggregation", true, true, "Provide exact value for rows_before_aggregation statistic, represents the number of rows read before aggregation"},
|
{"rows_before_aggregation", false, false, "Provide exact value for rows_before_aggregation statistic, represents the number of rows read before aggregation"},
|
||||||
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
|
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
|
||||||
{"restore_replace_external_engines_to_null", false, false, "New setting."},
|
{"restore_replace_external_engines_to_null", false, false, "New setting."},
|
||||||
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
|
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
|
||||||
|
@ -420,6 +420,21 @@ bool ISerialization::isEphemeralSubcolumn(const DB::ISerialization::SubstreamPat
|
|||||||
return path[last_elem].type == Substream::VariantElementNullMap;
|
return path[last_elem].type == Substream::VariantElementNullMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath & path, size_t prefix_len)
|
||||||
|
{
|
||||||
|
if (prefix_len == 0 || prefix_len > path.size())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
for (size_t i = 0; i != prefix_len; ++i)
|
||||||
|
{
|
||||||
|
if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::DynamicStructure
|
||||||
|
|| path[i].type == SubstreamType::ObjectData || path[i].type == SubstreamType::ObjectStructure)
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
|
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
|
||||||
{
|
{
|
||||||
assert(prefix_len <= path.size());
|
assert(prefix_len <= path.size());
|
||||||
|
@ -457,6 +457,9 @@ public:
|
|||||||
/// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);.
|
/// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);.
|
||||||
static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len);
|
static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len);
|
||||||
|
|
||||||
|
/// Returns true if stream with specified path corresponds to dynamic subcolumn.
|
||||||
|
static bool isDynamicSubcolumn(const SubstreamPath & path, size_t prefix_len);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
template <typename State, typename StatePtr>
|
template <typename State, typename StatePtr>
|
||||||
State * checkAndGetState(const StatePtr & state) const;
|
State * checkAndGetState(const StatePtr & state) const;
|
||||||
|
@ -339,7 +339,15 @@ DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const
|
|||||||
{
|
{
|
||||||
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
|
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
|
||||||
return std::make_unique<WriteBufferFromFile>(
|
return std::make_unique<WriteBufferFromFile>(
|
||||||
fs::path(disk_path) / path, buf_size, flags, settings.local_throttler);
|
fs::path(disk_path) / path,
|
||||||
|
buf_size,
|
||||||
|
flags,
|
||||||
|
settings.local_throttler,
|
||||||
|
0666,
|
||||||
|
nullptr,
|
||||||
|
0,
|
||||||
|
settings.use_adaptive_write_buffer,
|
||||||
|
settings.adaptive_write_buffer_initial_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<String> DiskLocal::getBlobPath(const String & path) const
|
std::vector<String> DiskLocal::getBlobPath(const String & path) const
|
||||||
|
@ -59,7 +59,7 @@ WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
|
|||||||
const WriteSettings & write_settings_,
|
const WriteSettings & write_settings_,
|
||||||
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_,
|
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_,
|
||||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_)
|
ThreadPoolCallbackRunnerUnsafe<void> schedule_)
|
||||||
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
|
: WriteBufferFromFileBase(std::min(buf_size_, static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0)
|
||||||
, log(getLogger("WriteBufferFromAzureBlobStorage"))
|
, log(getLogger("WriteBufferFromAzureBlobStorage"))
|
||||||
, buffer_allocation_policy(createBufferAllocationPolicy(*settings_))
|
, buffer_allocation_policy(createBufferAllocationPolicy(*settings_))
|
||||||
, max_single_part_upload_size(settings_->max_single_part_upload_size)
|
, max_single_part_upload_size(settings_->max_single_part_upload_size)
|
||||||
@ -244,11 +244,21 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer()
|
|||||||
buffer_allocation_policy->nextBuffer();
|
buffer_allocation_policy->nextBuffer();
|
||||||
chassert(0 == hidden_size);
|
chassert(0 == hidden_size);
|
||||||
|
|
||||||
auto size = buffer_allocation_policy->getBufferSize();
|
/// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor.
|
||||||
|
/// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy.
|
||||||
if (buffer_allocation_policy->getBufferNumber() == 1)
|
if (buffer_allocation_policy->getBufferNumber() == 1)
|
||||||
size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size);
|
{
|
||||||
|
/// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy.
|
||||||
|
/// Usually it doesn't happen but we have it in unit tests.
|
||||||
|
if (memory.size() > buffer_allocation_policy->getBufferSize())
|
||||||
|
{
|
||||||
|
memory.resize(buffer_allocation_policy->getBufferSize());
|
||||||
|
WriteBuffer::set(memory.data(), memory.size());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto size = buffer_allocation_policy->getBufferSize();
|
||||||
memory = Memory(size);
|
memory = Memory(size);
|
||||||
WriteBuffer::set(memory.data(), memory.size());
|
WriteBuffer::set(memory.data(), memory.size());
|
||||||
}
|
}
|
||||||
|
@ -289,7 +289,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
|
|||||||
return std::make_unique<WriteBufferFromAzureBlobStorage>(
|
return std::make_unique<WriteBufferFromAzureBlobStorage>(
|
||||||
client.get(),
|
client.get(),
|
||||||
object.remote_path,
|
object.remote_path,
|
||||||
buf_size,
|
write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size,
|
||||||
patchSettings(write_settings),
|
patchSettings(write_settings),
|
||||||
settings.get(),
|
settings.get(),
|
||||||
std::move(scheduler));
|
std::move(scheduler));
|
||||||
|
@ -282,7 +282,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
|||||||
client.get(),
|
client.get(),
|
||||||
uri.bucket,
|
uri.bucket,
|
||||||
object.remote_path,
|
object.remote_path,
|
||||||
buf_size,
|
write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size,
|
||||||
request_settings,
|
request_settings,
|
||||||
std::move(blob_storage_log),
|
std::move(blob_storage_log),
|
||||||
attributes,
|
attributes,
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
#include <Functions/IFunction.h>
|
#include <Functions/IFunction.h>
|
||||||
#include <Functions/FunctionFactory.h>
|
#include <Functions/FunctionFactory.h>
|
||||||
#include <Columns/ColumnLowCardinality.h>
|
#include <Columns/ColumnLowCardinality.h>
|
||||||
#include <DataTypes/DataTypeLowCardinality.h>
|
#include <Columns/ColumnSparse.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -18,11 +18,6 @@ public:
|
|||||||
return std::make_shared<FunctionMaterialize>();
|
return std::make_shared<FunctionMaterialize>();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool useDefaultImplementationForNulls() const override
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the function name.
|
/// Get the function name.
|
||||||
String getName() const override
|
String getName() const override
|
||||||
{
|
{
|
||||||
@ -34,8 +29,16 @@ public:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool useDefaultImplementationForNulls() const override { return false; }
|
||||||
|
|
||||||
|
bool useDefaultImplementationForNothing() const override { return false; }
|
||||||
|
|
||||||
|
bool useDefaultImplementationForConstants() const override { return false; }
|
||||||
|
|
||||||
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
|
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
|
||||||
|
|
||||||
|
bool useDefaultImplementationForSparseColumns() const override { return false; }
|
||||||
|
|
||||||
bool isSuitableForConstantFolding() const override { return false; }
|
bool isSuitableForConstantFolding() const override { return false; }
|
||||||
|
|
||||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
|
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
|
||||||
@ -52,7 +55,7 @@ public:
|
|||||||
|
|
||||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
|
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
|
||||||
{
|
{
|
||||||
return arguments[0].column->convertToFullColumnIfConst();
|
return recursiveRemoveSparse(arguments[0].column->convertToFullColumnIfConst());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasInformationAboutMonotonicity() const override { return true; }
|
bool hasInformationAboutMonotonicity() const override { return true; }
|
||||||
|
@ -32,8 +32,10 @@ WriteBufferFromFile::WriteBufferFromFile(
|
|||||||
ThrottlerPtr throttler_,
|
ThrottlerPtr throttler_,
|
||||||
mode_t mode,
|
mode_t mode,
|
||||||
char * existing_memory,
|
char * existing_memory,
|
||||||
size_t alignment)
|
size_t alignment,
|
||||||
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_)
|
bool use_adaptive_buffer_size_,
|
||||||
|
size_t adaptive_buffer_initial_size)
|
||||||
|
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_, use_adaptive_buffer_size_, adaptive_buffer_initial_size)
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
ProfileEvents::increment(ProfileEvents::FileOpen);
|
||||||
|
|
||||||
@ -66,8 +68,10 @@ WriteBufferFromFile::WriteBufferFromFile(
|
|||||||
size_t buf_size,
|
size_t buf_size,
|
||||||
ThrottlerPtr throttler_,
|
ThrottlerPtr throttler_,
|
||||||
char * existing_memory,
|
char * existing_memory,
|
||||||
size_t alignment)
|
size_t alignment,
|
||||||
: WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name)
|
bool use_adaptive_buffer_size_,
|
||||||
|
size_t adaptive_buffer_initial_size)
|
||||||
|
: WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name, use_adaptive_buffer_size_, adaptive_buffer_initial_size)
|
||||||
{
|
{
|
||||||
fd_ = -1;
|
fd_ = -1;
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,9 @@ public:
|
|||||||
ThrottlerPtr throttler_ = {},
|
ThrottlerPtr throttler_ = {},
|
||||||
mode_t mode = 0666,
|
mode_t mode = 0666,
|
||||||
char * existing_memory = nullptr,
|
char * existing_memory = nullptr,
|
||||||
size_t alignment = 0);
|
size_t alignment = 0,
|
||||||
|
bool use_adaptive_buffer_size_ = false,
|
||||||
|
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
|
||||||
|
|
||||||
/// Use pre-opened file descriptor.
|
/// Use pre-opened file descriptor.
|
||||||
explicit WriteBufferFromFile(
|
explicit WriteBufferFromFile(
|
||||||
@ -45,7 +47,9 @@ public:
|
|||||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||||
ThrottlerPtr throttler_ = {},
|
ThrottlerPtr throttler_ = {},
|
||||||
char * existing_memory = nullptr,
|
char * existing_memory = nullptr,
|
||||||
size_t alignment = 0);
|
size_t alignment = 0,
|
||||||
|
bool use_adaptive_buffer_size_ = false,
|
||||||
|
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
|
||||||
|
|
||||||
~WriteBufferFromFile() override;
|
~WriteBufferFromFile() override;
|
||||||
|
|
||||||
|
@ -83,6 +83,13 @@ void WriteBufferFromFileDescriptor::nextImpl()
|
|||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
|
ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
|
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
|
||||||
|
|
||||||
|
/// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer.
|
||||||
|
if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_max_buffer_size)
|
||||||
|
{
|
||||||
|
memory.resize(std::min(memory.size() * 2, adaptive_max_buffer_size));
|
||||||
|
BufferBase::set(memory.data(), memory.size(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// NOTE: This class can be used as a very low-level building block, for example
|
/// NOTE: This class can be used as a very low-level building block, for example
|
||||||
@ -94,11 +101,15 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
|
|||||||
char * existing_memory,
|
char * existing_memory,
|
||||||
ThrottlerPtr throttler_,
|
ThrottlerPtr throttler_,
|
||||||
size_t alignment,
|
size_t alignment,
|
||||||
std::string file_name_)
|
std::string file_name_,
|
||||||
: WriteBufferFromFileBase(buf_size, existing_memory, alignment)
|
bool use_adaptive_buffer_size_,
|
||||||
|
size_t adaptive_buffer_initial_size)
|
||||||
|
: WriteBufferFromFileBase(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size, existing_memory, alignment)
|
||||||
, fd(fd_)
|
, fd(fd_)
|
||||||
, throttler(throttler_)
|
, throttler(throttler_)
|
||||||
, file_name(std::move(file_name_))
|
, file_name(std::move(file_name_))
|
||||||
|
, use_adaptive_buffer_size(use_adaptive_buffer_size_)
|
||||||
|
, adaptive_max_buffer_size(buf_size)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,6 +135,7 @@ void WriteBufferFromFileDescriptor::finalizeImpl()
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use_adaptive_buffer_size = false;
|
||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,7 +18,9 @@ public:
|
|||||||
char * existing_memory = nullptr,
|
char * existing_memory = nullptr,
|
||||||
ThrottlerPtr throttler_ = {},
|
ThrottlerPtr throttler_ = {},
|
||||||
size_t alignment = 0,
|
size_t alignment = 0,
|
||||||
std::string file_name_ = "");
|
std::string file_name_ = "",
|
||||||
|
bool use_adaptive_buffer_size_ = false,
|
||||||
|
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
|
||||||
|
|
||||||
/** Could be used before initialization if needed 'fd' was not passed to constructor.
|
/** Could be used before initialization if needed 'fd' was not passed to constructor.
|
||||||
* It's not possible to change 'fd' during work.
|
* It's not possible to change 'fd' during work.
|
||||||
@ -56,6 +58,12 @@ protected:
|
|||||||
/// If file has name contains filename, otherwise contains string "(fd=...)"
|
/// If file has name contains filename, otherwise contains string "(fd=...)"
|
||||||
std::string file_name;
|
std::string file_name;
|
||||||
|
|
||||||
|
/// If true, the size of internal buffer will be exponentially increased up to
|
||||||
|
/// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid
|
||||||
|
/// large buffer allocation when actual size of written data is small.
|
||||||
|
bool use_adaptive_buffer_size;
|
||||||
|
size_t adaptive_max_buffer_size;
|
||||||
|
|
||||||
void finalizeImpl() override;
|
void finalizeImpl() override;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ WriteBufferFromS3::WriteBufferFromS3(
|
|||||||
std::optional<std::map<String, String>> object_metadata_,
|
std::optional<std::map<String, String>> object_metadata_,
|
||||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
|
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
|
||||||
const WriteSettings & write_settings_)
|
const WriteSettings & write_settings_)
|
||||||
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
|
: WriteBufferFromFileBase(std::min(buf_size_, static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0)
|
||||||
, bucket(bucket_)
|
, bucket(bucket_)
|
||||||
, key(key_)
|
, key(key_)
|
||||||
, request_settings(request_settings_)
|
, request_settings(request_settings_)
|
||||||
@ -351,9 +351,17 @@ void WriteBufferFromS3::allocateBuffer()
|
|||||||
buffer_allocation_policy->nextBuffer();
|
buffer_allocation_policy->nextBuffer();
|
||||||
chassert(0 == hidden_size);
|
chassert(0 == hidden_size);
|
||||||
|
|
||||||
|
/// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size.
|
||||||
|
/// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy.
|
||||||
if (buffer_allocation_policy->getBufferNumber() == 1)
|
if (buffer_allocation_policy->getBufferNumber() == 1)
|
||||||
{
|
{
|
||||||
allocateFirstBuffer();
|
/// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy.
|
||||||
|
/// Usually it doesn't happen but we have it in unit tests.
|
||||||
|
if (memory.size() > buffer_allocation_policy->getBufferSize())
|
||||||
|
{
|
||||||
|
memory.resize(buffer_allocation_policy->getBufferSize());
|
||||||
|
WriteBuffer::set(memory.data(), memory.size());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -361,14 +369,6 @@ void WriteBufferFromS3::allocateBuffer()
|
|||||||
WriteBuffer::set(memory.data(), memory.size());
|
WriteBuffer::set(memory.data(), memory.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBufferFromS3::allocateFirstBuffer()
|
|
||||||
{
|
|
||||||
const auto max_first_buffer = buffer_allocation_policy->getBufferSize();
|
|
||||||
const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer);
|
|
||||||
memory = Memory(size);
|
|
||||||
WriteBuffer::set(memory.data(), memory.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteBufferFromS3::setFakeBufferWhenPreFinalized()
|
void WriteBufferFromS3::setFakeBufferWhenPreFinalized()
|
||||||
{
|
{
|
||||||
WriteBuffer::set(fake_buffer_when_prefinalized, sizeof(fake_buffer_when_prefinalized));
|
WriteBuffer::set(fake_buffer_when_prefinalized, sizeof(fake_buffer_when_prefinalized));
|
||||||
|
@ -64,7 +64,6 @@ private:
|
|||||||
void reallocateFirstBuffer();
|
void reallocateFirstBuffer();
|
||||||
void detachBuffer();
|
void detachBuffer();
|
||||||
void allocateBuffer();
|
void allocateBuffer();
|
||||||
void allocateFirstBuffer();
|
|
||||||
void setFakeBufferWhenPreFinalized();
|
void setFakeBufferWhenPreFinalized();
|
||||||
|
|
||||||
S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data);
|
S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data);
|
||||||
|
@ -54,7 +54,7 @@ inline void WriteBufferValidUTF8::putReplacement()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
inline void WriteBufferValidUTF8::putValid(char *data, size_t len)
|
inline void WriteBufferValidUTF8::putValid(const char *data, size_t len)
|
||||||
{
|
{
|
||||||
if (len == 0)
|
if (len == 0)
|
||||||
return;
|
return;
|
||||||
@ -149,9 +149,34 @@ void WriteBufferValidUTF8::finalizeImpl()
|
|||||||
/// Write all complete sequences from buffer.
|
/// Write all complete sequences from buffer.
|
||||||
nextImpl();
|
nextImpl();
|
||||||
|
|
||||||
/// If unfinished sequence at end, then write replacement.
|
/// Handle remaining bytes if we have an incomplete sequence
|
||||||
if (working_buffer.begin() != memory.data())
|
if (working_buffer.begin() != memory.data())
|
||||||
|
{
|
||||||
|
const char * p = memory.data();
|
||||||
|
|
||||||
|
while (p < pos)
|
||||||
|
{
|
||||||
|
UInt8 len = length_of_utf8_sequence[static_cast<const unsigned char>(*p)];
|
||||||
|
if (p + len > pos)
|
||||||
|
{
|
||||||
|
/// Incomplete sequence. Skip one byte.
|
||||||
putReplacement();
|
putReplacement();
|
||||||
|
++p;
|
||||||
|
}
|
||||||
|
else if (Poco::UTF8Encoding::isLegal(reinterpret_cast<const unsigned char *>(p), len))
|
||||||
|
{
|
||||||
|
/// Valid sequence
|
||||||
|
putValid(p, len);
|
||||||
|
p += len;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// Invalid sequence, skip first byte.
|
||||||
|
putReplacement();
|
||||||
|
++p;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
void putReplacement();
|
void putReplacement();
|
||||||
void putValid(char * data, size_t len);
|
void putValid(const char * data, size_t len);
|
||||||
|
|
||||||
void nextImpl() override;
|
void nextImpl() override;
|
||||||
void finalizeImpl() override;
|
void finalizeImpl() override;
|
||||||
|
@ -24,6 +24,9 @@ struct WriteSettings
|
|||||||
bool s3_allow_parallel_part_upload = true;
|
bool s3_allow_parallel_part_upload = true;
|
||||||
bool azure_allow_parallel_part_upload = true;
|
bool azure_allow_parallel_part_upload = true;
|
||||||
|
|
||||||
|
bool use_adaptive_write_buffer = false;
|
||||||
|
size_t adaptive_write_buffer_initial_size = 16 * 1024;
|
||||||
|
|
||||||
bool operator==(const WriteSettings & other) const = default;
|
bool operator==(const WriteSettings & other) const = default;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -85,11 +85,11 @@ MergeTreeDataPartWriterOnDisk::Stream<false>::Stream(
|
|||||||
marks_file_extension{marks_file_extension_},
|
marks_file_extension{marks_file_extension_},
|
||||||
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
|
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
|
||||||
plain_hashing(*plain_file),
|
plain_hashing(*plain_file),
|
||||||
compressor(plain_hashing, compression_codec_, max_compress_block_size_),
|
compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
|
||||||
compressed_hashing(compressor),
|
compressed_hashing(compressor),
|
||||||
marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
|
marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
|
||||||
marks_hashing(*marks_file),
|
marks_hashing(*marks_file),
|
||||||
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
|
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
|
||||||
marks_compressed_hashing(marks_compressor),
|
marks_compressed_hashing(marks_compressor),
|
||||||
compress_marks(MarkType(marks_file_extension).compressed)
|
compress_marks(MarkType(marks_file_extension).compressed)
|
||||||
{
|
{
|
||||||
@ -108,7 +108,7 @@ MergeTreeDataPartWriterOnDisk::Stream<true>::Stream(
|
|||||||
data_file_extension{data_file_extension_},
|
data_file_extension{data_file_extension_},
|
||||||
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
|
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
|
||||||
plain_hashing(*plain_file),
|
plain_hashing(*plain_file),
|
||||||
compressor(plain_hashing, compression_codec_, max_compress_block_size_),
|
compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
|
||||||
compressed_hashing(compressor),
|
compressed_hashing(compressor),
|
||||||
compress_marks(false)
|
compress_marks(false)
|
||||||
{
|
{
|
||||||
|
@ -177,6 +177,10 @@ void MergeTreeDataPartWriterWide::addStreams(
|
|||||||
if (!max_compress_block_size)
|
if (!max_compress_block_size)
|
||||||
max_compress_block_size = settings.max_compress_block_size;
|
max_compress_block_size = settings.max_compress_block_size;
|
||||||
|
|
||||||
|
WriteSettings query_write_settings = settings.query_write_settings;
|
||||||
|
query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size());
|
||||||
|
query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size;
|
||||||
|
|
||||||
column_streams[stream_name] = std::make_unique<Stream<false>>(
|
column_streams[stream_name] = std::make_unique<Stream<false>>(
|
||||||
stream_name,
|
stream_name,
|
||||||
data_part_storage,
|
data_part_storage,
|
||||||
@ -186,7 +190,7 @@ void MergeTreeDataPartWriterWide::addStreams(
|
|||||||
max_compress_block_size,
|
max_compress_block_size,
|
||||||
marks_compression_codec,
|
marks_compression_codec,
|
||||||
settings.marks_compress_block_size,
|
settings.marks_compress_block_size,
|
||||||
settings.query_write_settings);
|
query_write_settings);
|
||||||
|
|
||||||
full_name_to_stream_name.emplace(full_stream_name, stream_name);
|
full_name_to_stream_name.emplace(full_stream_name, stream_name);
|
||||||
stream_name_to_full_name.emplace(stream_name, full_stream_name);
|
stream_name_to_full_name.emplace(stream_name, full_stream_name);
|
||||||
|
@ -30,6 +30,8 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
|
|||||||
, low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size)
|
, low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size)
|
||||||
, low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0)
|
, low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0)
|
||||||
, use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization)
|
, use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization)
|
||||||
|
, use_adaptive_write_buffer_for_dynamic_subcolumns(storage_settings->use_adaptive_write_buffer_for_dynamic_subcolumns)
|
||||||
|
, adaptive_write_buffer_initial_size(storage_settings->adaptive_write_buffer_initial_size)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,6 +80,8 @@ struct MergeTreeWriterSettings
|
|||||||
size_t low_cardinality_max_dictionary_size;
|
size_t low_cardinality_max_dictionary_size;
|
||||||
bool low_cardinality_use_single_dictionary_for_part;
|
bool low_cardinality_use_single_dictionary_for_part;
|
||||||
bool use_compact_variant_discriminators_serialization;
|
bool use_compact_variant_discriminators_serialization;
|
||||||
|
bool use_adaptive_write_buffer_for_dynamic_subcolumns;
|
||||||
|
size_t adaptive_write_buffer_initial_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -99,6 +99,8 @@ struct Settings;
|
|||||||
M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
|
M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
|
||||||
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
|
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
|
||||||
M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \
|
M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \
|
||||||
|
M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \
|
||||||
|
M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \
|
||||||
\
|
\
|
||||||
/* Part removal settings. */ \
|
/* Part removal settings. */ \
|
||||||
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \
|
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \
|
||||||
|
@ -11,7 +11,7 @@ create table t (x UInt64, s String) engine = MergeTree order by x SETTINGS index
|
|||||||
INSERT INTO t SELECT
|
INSERT INTO t SELECT
|
||||||
number,
|
number,
|
||||||
if(number < (8129 * 1024), arrayStringConcat(arrayMap(x -> toString(x), range(number % 128)), ' '), '')
|
if(number < (8129 * 1024), arrayStringConcat(arrayMap(x -> toString(x), range(number % 128)), ' '), '')
|
||||||
FROM numbers_mt((8129 * 1024) * 3) settings max_insert_threads=8, max_rows_to_read=0;
|
FROM numbers_mt((8129 * 1024) * 3) settings max_insert_threads=8, max_rows_to_read=0, max_memory_usage='10Gi';
|
||||||
|
|
||||||
-- optimize table t final;
|
-- optimize table t final;
|
||||||
|
|
||||||
|
@ -0,0 +1,50 @@
|
|||||||
|
-- { echoOn }
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(id) FROM sparse_t;
|
||||||
|
UInt64, Sparse(size = 2, UInt64(size = 2), UInt64(size = 1))
|
||||||
|
UInt64, Sparse(size = 2, UInt64(size = 2), UInt64(size = 1))
|
||||||
|
SELECT dumpColumnStructure(materialize(id)) FROM sparse_t;
|
||||||
|
UInt64, UInt64(size = 2)
|
||||||
|
UInt64, UInt64(size = 2)
|
||||||
|
SELECT dumpColumnStructure(u) FROM sparse_t;
|
||||||
|
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
|
||||||
|
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
|
||||||
|
SELECT dumpColumnStructure(materialize(u)) FROM sparse_t;
|
||||||
|
UInt64, UInt64(size = 2)
|
||||||
|
UInt64, UInt64(size = 2)
|
||||||
|
SELECT dumpColumnStructure(s) FROM sparse_t;
|
||||||
|
String, Sparse(size = 2, String(size = 2), UInt64(size = 1))
|
||||||
|
String, Sparse(size = 2, String(size = 2), UInt64(size = 1))
|
||||||
|
SELECT dumpColumnStructure(materialize(s)) FROM sparse_t;
|
||||||
|
String, String(size = 2)
|
||||||
|
String, String(size = 2)
|
||||||
|
SELECT dumpColumnStructure(arr1) FROM sparse_t;
|
||||||
|
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
|
||||||
|
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
|
||||||
|
SELECT dumpColumnStructure(materialize(arr1)) FROM sparse_t;
|
||||||
|
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
|
||||||
|
Array(String), Array(size = 2, UInt64(size = 2), String(size = 1))
|
||||||
|
SELECT dumpColumnStructure(arr2) FROM sparse_t;
|
||||||
|
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
|
||||||
|
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
|
||||||
|
SELECT dumpColumnStructure(materialize(arr2)) FROM sparse_t;
|
||||||
|
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
|
||||||
|
Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1))
|
||||||
|
SELECT dumpColumnStructure(t) FROM sparse_t;
|
||||||
|
Tuple(a UInt64, s String), Tuple(size = 2, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)), Sparse(size = 2, String(size = 1), UInt64(size = 0)))
|
||||||
|
Tuple(a UInt64, s String), Tuple(size = 2, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)), Sparse(size = 2, String(size = 1), UInt64(size = 0)))
|
||||||
|
SELECT dumpColumnStructure(materialize(t)) FROM sparse_t;
|
||||||
|
Tuple(a UInt64, s String), Tuple(size = 2, UInt64(size = 2), String(size = 2))
|
||||||
|
Tuple(a UInt64, s String), Tuple(size = 2, UInt64(size = 2), String(size = 2))
|
||||||
|
SELECT dumpColumnStructure(t.a) FROM sparse_t;
|
||||||
|
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
|
||||||
|
UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0))
|
||||||
|
SELECT dumpColumnStructure(materialize(t.a)) FROM sparse_t;
|
||||||
|
UInt64, UInt64(size = 2)
|
||||||
|
UInt64, UInt64(size = 2)
|
||||||
|
SELECT dumpColumnStructure(t.s) FROM sparse_t;
|
||||||
|
String, Sparse(size = 2, String(size = 1), UInt64(size = 0))
|
||||||
|
String, Sparse(size = 2, String(size = 1), UInt64(size = 0))
|
||||||
|
SELECT dumpColumnStructure(materialize(t.s)) FROM sparse_t;
|
||||||
|
String, String(size = 2)
|
||||||
|
String, String(size = 2)
|
@ -0,0 +1,52 @@
|
|||||||
|
DROP TABLE IF EXISTS sparse_t;
|
||||||
|
|
||||||
|
CREATE TABLE sparse_t (
|
||||||
|
id UInt64,
|
||||||
|
u UInt64,
|
||||||
|
s String,
|
||||||
|
arr1 Array(String),
|
||||||
|
arr2 Array(UInt64),
|
||||||
|
t Tuple(a UInt64, s String))
|
||||||
|
ENGINE = MergeTree ORDER BY tuple()
|
||||||
|
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.1;
|
||||||
|
|
||||||
|
INSERT INTO sparse_t SELECT
|
||||||
|
number,
|
||||||
|
if (number % 2 = 0, number, 0),
|
||||||
|
if (number % 2 = 0, toString(number), ''),
|
||||||
|
if (number % 2 = 0, [''], []),
|
||||||
|
if (number % 2 = 0, [0], []),
|
||||||
|
(if (number % 2 = 0, number, 0), '')
|
||||||
|
FROM numbers(2);
|
||||||
|
|
||||||
|
-- { echoOn }
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(id) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(id)) FROM sparse_t;
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(u) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(u)) FROM sparse_t;
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(s) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(s)) FROM sparse_t;
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(arr1) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(arr1)) FROM sparse_t;
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(arr2) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(arr2)) FROM sparse_t;
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(t) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(t)) FROM sparse_t;
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(t.a) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(t.a)) FROM sparse_t;
|
||||||
|
|
||||||
|
SELECT dumpColumnStructure(t.s) FROM sparse_t;
|
||||||
|
SELECT dumpColumnStructure(materialize(t.s)) FROM sparse_t;
|
||||||
|
|
||||||
|
-- { echoOff }
|
||||||
|
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS sparse_t
|
||||||
|
;
|
@ -0,0 +1,16 @@
|
|||||||
|
{
|
||||||
|
"meta":
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "unhex('f0')",
|
||||||
|
"type": "String"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
|
||||||
|
"data":
|
||||||
|
[
|
||||||
|
["<22>"]
|
||||||
|
],
|
||||||
|
|
||||||
|
"rows": 1
|
||||||
|
}
|
@ -0,0 +1,2 @@
|
|||||||
|
SET output_format_write_statistics = 0;
|
||||||
|
SELECT unhex('f0') FORMAT JSONCompact;
|
@ -1,3 +1,4 @@
|
|||||||
|
SYSTEM FLUSH LOGS;
|
||||||
SELECT
|
SELECT
|
||||||
is_initial_query,
|
is_initial_query,
|
||||||
count() AS c,
|
count() AS c,
|
||||||
|
Loading…
Reference in New Issue
Block a user