Merge branch 'master' into force-documentation-3

This commit is contained in:
Alexander Gololobov 2022-08-26 20:29:06 +02:00 committed by GitHub
commit 4fef408104
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 466 additions and 161 deletions

View File

@ -7,12 +7,8 @@
# How to install Ninja on Ubuntu:
# sudo apt-get install ninja-build
# CLion does not support Ninja
# You can add your vote on CLion task tracker:
# https://youtrack.jetbrains.com/issue/CPP-2659
# https://youtrack.jetbrains.com/issue/CPP-870
if (NOT DEFINED ENV{CLION_IDE} AND NOT DEFINED ENV{XCODE_IDE})
if (NOT DEFINED ENV{XCODE_IDE})
find_program(NINJA_PATH ninja)
if (NINJA_PATH)
set(CMAKE_GENERATOR "Ninja" CACHE INTERNAL "")

View File

@ -78,6 +78,7 @@ RUN export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& apt-get update \
&& apt-get install \
clang-15 \
llvm-15 \
clang-tidy-15 \
--yes --no-install-recommends \
&& apt-get clean

View File

@ -1,4 +1,5 @@
#!/bin/bash
# shellcheck disable=SC2024
set -e -x -a -u
@ -9,7 +10,7 @@ cd hadoop-3.3.1
export JAVA_HOME=/usr
mkdir -p target/test/data
chown clickhouse ./target/test/data
sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 &
sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 >> /test_output/garbage.log 2>&1 &
while ! nc -z localhost 12222; do
sleep 1

View File

@ -31,6 +31,7 @@ ClickHouse Inc does **not** maintain the libraries listed below and hasnt don
- [chconn](https://github.com/vahid-sohrabloo/chconn)
- [mailrugo-clickhouse](https://github.com/mailru/go-clickhouse)
- [golang-clickhouse](https://github.com/leprosus/golang-clickhouse)
- [uptrace/go-clickhouse](https://clickhouse.uptrace.dev/)
- Swift
- [ClickHouseNIO](https://github.com/patrick-zippenfenig/ClickHouseNIO)
- [ClickHouseVapor ORM](https://github.com/patrick-zippenfenig/ClickHouseVapor)

View File

@ -196,6 +196,18 @@ Features:
The client is available for instant usage through github pages: https://metrico.github.io/clickhouse-mate/
### Uptrace {#uptrace}
[Uptrace](https://github.com/uptrace/uptrace) is an APM tool that provides distributed tracing and metrics powered by OpenTelemetry and ClickHouse.
Features:
- [OpenTelemetry tracing](https://uptrace.dev/opentelemetry/distributed-tracing.html), metrics, and logs.
- Email/Slack/PagerDuty notifications using AlertManager.
- SQL-like query language to aggregate spans.
- Promql-like language to query metrics.
- Pre-built metrics dashboards.
- Multiple users/projects via YAML config.
## Commercial {#commercial}

View File

@ -3242,7 +3242,7 @@ Possible values:
- Positive integer.
- 0 — Asynchronous insertions are disabled.
Default value: `1000000`.
Default value: `100000`.
## async_insert_busy_timeout_ms {#async-insert-busy-timeout-ms}

View File

@ -11,6 +11,8 @@ namespace DB
class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile
{
public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,
const String & file_path_,

View File

@ -4,8 +4,6 @@
#include <base/defines.h>
#include <mutex>
namespace Poco { class TemporaryFile; }
namespace DB
{
class TemporaryFileOnDisk;
@ -16,6 +14,8 @@ using DiskPtr = std::shared_ptr<IDisk>;
class BackupEntryFromImmutableFile : public IBackupEntry
{
public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
BackupEntryFromImmutableFile(
const DiskPtr & disk_,
const String & file_path_,
@ -40,7 +40,6 @@ private:
mutable std::optional<UInt64> file_size TSA_GUARDED_BY(get_file_size_mutex);
mutable std::mutex get_file_size_mutex;
const std::optional<UInt128> checksum;
const std::shared_ptr<Poco::TemporaryFile> temporary_file;
const std::shared_ptr<TemporaryFileOnDisk> temporary_file_on_disk;
};

View File

@ -29,6 +29,10 @@
M(PostgreSQLConnection, "Number of client connections using PostgreSQL protocol") \
M(OpenFileForRead, "Number of files open for reading") \
M(OpenFileForWrite, "Number of files open for writing") \
M(TotalTemporaryFiles, "Number of temporary files created") \
M(TemporaryFilesForSort, "Number of temporary files created for external sorting") \
M(TemporaryFilesForAggregation, "Number of temporary files created for external aggregation") \
M(TemporaryFilesForJoin, "Number of temporary files created for JOIN") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \
M(NetworkReceive, "Number of threads receiving data from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \

View File

@ -119,18 +119,27 @@
\
M(ExecuteShellCommand, "Number of shell command executions.") \
\
M(ExternalProcessingCompressedBytesTotal, "Number of compressed bytes written by external processing (sorting/aggragating/joining)") \
M(ExternalProcessingUncompressedBytesTotal, "Amount of data (uncompressed, before compression) written by external processing (sorting/aggragating/joining)") \
M(ExternalProcessingFilesTotal, "Number of files used by external processing (sorting/aggragating/joining)") \
M(ExternalSortWritePart, "Number of times a temporary file was written to disk for sorting in external memory.") \
M(ExternalSortMerge, "Number of times temporary files were merged for sorting in external memory.") \
M(ExternalSortCompressedBytes, "Number of compressed bytes written for sorting in external memory.") \
M(ExternalSortUncompressedBytes, "Amount of data (uncompressed, before compression) written for sorting in external memory.") \
M(ExternalAggregationWritePart, "Number of times a temporary file was written to disk for aggregation in external memory.") \
M(ExternalAggregationMerge, "Number of times temporary files were merged for aggregation in external memory.") \
M(ExternalAggregationCompressedBytes, "Number of bytes written to disk for aggregation in external memory.") \
M(ExternalAggregationUncompressedBytes, "Amount of data (uncompressed, before compression) written to disk for aggregation in external memory.") \
M(ExternalJoinWritePart, "Number of times a temporary file was written to disk for join in external memory.") \
M(ExternalJoinMerge, "Number of times temporary files were merged for join in external memory.) \
M(ExternalJoinCompressedBytes, "Number of compressed bytes written for join in external memory.") \
M(ExternalJoinUncompressedBytes, "Amount of data (uncompressed, before compression) written for join in external memory.") \
\
M(SlowRead, "Number of reads from a file that were slow. This indicate system overload. Thresholds are controlled by read_backoff_* settings.") \
M(ReadBackoff, "Number of times the number of query processing threads was lowered due to slow reads.") \
\
M(ReplicaPartialShutdown, "How many times Replicated table has to deinitialize its state due to session expiration in ZooKeeper. The state is reinitialized every time when ZooKeeper is available again.") \
\
\Number of uncompressed bytes
M(SelectedParts, "Number of data parts selected to read from a MergeTree table.") \
M(SelectedRanges, "Number of (non-adjacent) ranges in all data parts selected to read from a MergeTree table.") \
M(SelectedMarks, "Number of marks (index granules) selected to read from a MergeTree table.") \

View File

@ -5,12 +5,22 @@
namespace DB
{
inline size_t encodeBase58(const char8_t * src, char8_t * dst)
inline size_t encodeBase58(const char8_t * src, size_t srclen, char8_t * dst)
{
const char * base58_encoding_alphabet = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz";
size_t processed = 0;
size_t zeros = 0;
for (;*src == '\0' && processed < srclen-1; ++src)
{
++processed;
++zeros;
*dst++ = '1';
}
size_t idx = 0;
for (; *src; ++src)
while (processed < srclen-1)
{
unsigned int carry = static_cast<unsigned char>(*src);
for (size_t j = 0; j < idx; ++j)
@ -24,6 +34,8 @@ inline size_t encodeBase58(const char8_t * src, char8_t * dst)
dst[idx++] = static_cast<unsigned char>(carry % 58);
carry /= 58;
}
++src;
++processed;
}
size_t c_idx = idx >> 1;
@ -37,23 +49,38 @@ inline size_t encodeBase58(const char8_t * src, char8_t * dst)
{
dst[c_idx] = base58_encoding_alphabet[static_cast<unsigned char>(dst[c_idx])];
}
dst[idx] = '\0';
return idx + 1;
return zeros + idx + 1;
}
inline size_t decodeBase58(const char8_t * src, char8_t * dst)
inline size_t decodeBase58(const char8_t * src, size_t srclen, char8_t * dst)
{
const signed char uint_max = UINT_MAX;
const signed char map_digits[128]
= {uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max,
uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max,
uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max,
uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, 0, 1, 2, 3, 4, 5, 6, 7, 8, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, 9, 10, 11, 12, 13, 14, 15, 16, uint_max, 17, 18, 19, 20, 21, uint_max, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, uint_max, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, uint_max, uint_max, uint_max, uint_max, uint_max};
uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, 0, 1, 2,
3, 4, 5, 6, 7, 8, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, uint_max,
9, 10, 11, 12, 13, 14, 15, 16, uint_max, 17, 18, 19, 20,
21, uint_max, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
uint_max, uint_max, uint_max, uint_max, uint_max, uint_max, 33, 34, 35, 36, 37, 38, 39,
40, 41, 42, 43, uint_max, 44, 45, 46, 47, 48, 49, 50, 51,
52, 53, 54, 55, 56, 57, uint_max, uint_max, uint_max, uint_max, uint_max};
size_t processed = 0;
size_t zeros = 0;
for (;*src == '1' && processed < srclen-1; ++src)
{
++processed;
++zeros;
*dst++ = '\0';
}
size_t idx = 0;
for (; *src; ++src)
while (processed < srclen-1)
{
unsigned int carry = map_digits[*src];
if (unlikely(carry == UINT_MAX))
@ -71,6 +98,8 @@ inline size_t decodeBase58(const char8_t * src, char8_t * dst)
dst[idx++] = static_cast<unsigned char>(carry & 0xff);
carry >>= 8;
}
++src;
++processed;
}
size_t c_idx = idx >> 1;
@ -81,7 +110,7 @@ inline size_t decodeBase58(const char8_t * src, char8_t * dst)
dst[idx - (i + 1)] = s;
}
dst[idx] = '\0';
return idx + 1;
return zeros + idx + 1;
}
}

View File

@ -16,9 +16,17 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Disks/IDisk.h>
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event ExternalProcessingFilesTotal;
}
namespace DB
{
@ -34,7 +42,6 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_FILE;
}
struct statvfs getStatVFS(const String & path)
{
struct statvfs fs;
@ -47,18 +54,20 @@ struct statvfs getStatVFS(const String & path)
return fs;
}
bool enoughSpaceInDirectory(const std::string & path [[maybe_unused]], size_t data_size [[maybe_unused]])
bool enoughSpaceInDirectory(const std::string & path, size_t data_size)
{
auto free_space = fs::space(path).free;
fs::path filepath(path);
/// `path` may point to nonexisting file, then we can't check it directly, move to parent directory
while (filepath.has_parent_path() && !fs::exists(filepath))
filepath = filepath.parent_path();
auto free_space = fs::space(filepath).free;
return data_size <= free_space;
}
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
{
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
fs::create_directories(path);
/// NOTE: std::make_shared cannot use protected constructors
return std::make_unique<TemporaryFile>(path);
}

View File

@ -19,6 +19,7 @@ using TemporaryFile = Poco::TemporaryFile;
bool enoughSpaceInDirectory(const std::string & path, size_t data_size);
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
// Determine what block device is responsible for specified path
#if !defined(OS_LINUX)
[[noreturn]]

View File

@ -64,8 +64,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Seconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.", 0) \
M(Milliseconds, connect_timeout_with_failover_ms, 50, "Connection timeout for selecting first healthy replica.", 0) \
M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Timeout for receiving data from network, in seconds. If no bytes were received in this interval, exception is thrown. If you set this setting on client, the 'send_timeout' for the socket will be also send on the corresponding connection end on the server.", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "Timeout for sending data to network, in seconds. If client needs to sent some data, but it did not able to send any bytes in this interval, exception is thrown. If you set this setting on client, the 'receive_timeout' for the socket will be also send on the corresponding connection end on the server.", 0) \
M(Seconds, drain_timeout, 3, "Timeout for draining remote connections, -1 means synchronous drain without ignoring errors", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
@ -140,8 +140,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \
\
M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \
M(UInt64, parallel_replicas_count, 0, "", 0) \
M(UInt64, parallel_replica_offset, 0, "", 0) \
M(UInt64, parallel_replicas_count, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the number of parallel replicas participating in query processing.", 0) \
M(UInt64, parallel_replica_offset, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the index of the replica participating in query processing among parallel replicas.", 0) \
\
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
\
@ -214,7 +214,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be performed", 0) \
\
M(UInt64Auto, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \
M(Milliseconds, insert_quorum_timeout, 600000, "If the quorum of replicas did not meet in specified time (in milliseconds), exception will be thrown and insertion is aborted.", 0) \
M(Bool, insert_quorum_parallel, true, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \
M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \
M(UInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.", 0) \
@ -251,7 +251,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(JoinStrictness, join_default_strictness, JoinStrictness::All, "Set default strictness in JOIN query. Possible values: empty string, 'ANY', 'ALL'. If empty, query without strictness will throw exception.", 0) \
M(Bool, any_join_distinct_right_table_keys, false, "Enable old ANY JOIN logic with many-to-one left-to-right table keys mapping for all ANY JOINs. It leads to confusing not equal results for 't1 ANY LEFT JOIN t2' and 't2 ANY RIGHT JOIN t1'. ANY RIGHT JOIN needs one-to-many keys mapping to be consistent with LEFT one.", IMPORTANT) \
\
M(UInt64, preferred_block_size_bytes, 1000000, "", 0) \
M(UInt64, preferred_block_size_bytes, 1000000, "This setting adjusts the data block size for query processing and represents additional fine tune to the more rough 'max_block_size' setting. If the columns are large and with 'max_block_size' rows the block size is likely to be larger than the specified amount of bytes, its size will be lowered for better CPU cache locality.", 0) \
\
M(UInt64, max_replica_delay_for_distributed_queries, 300, "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified value (not inclusive). Zero means do not take delay into account.", 0) \
M(Bool, fallback_to_stale_replicas_for_distributed_queries, true, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is enabled, the query will be performed anyway, otherwise the error will be reported.", 0) \
@ -316,14 +316,14 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_bytes_to_read_leaf, 0, "Limit on read bytes (after decompression) on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node.", 0) \
M(OverflowMode, read_overflow_mode_leaf, OverflowMode::THROW, "What to do when the leaf limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_group_by, 0, "", 0) \
M(UInt64, max_rows_to_group_by, 0, "If aggregation during GROUP BY is generating more than specified number of rows (unique GROUP BY keys), the behavior will be determined by the 'group_by_overflow_mode' which by default is - throw an exception, but can be also switched to an approximate GROUP BY mode.", 0) \
M(OverflowModeGroupBy, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, max_bytes_before_external_group_by, 0, "", 0) \
M(UInt64, max_bytes_before_external_group_by, 0, "If memory usage during GROUP BY operation is exceeding this threshold in bytes, activate the 'external aggregation' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
\
M(UInt64, max_rows_to_sort, 0, "", 0) \
M(UInt64, max_bytes_to_sort, 0, "", 0) \
M(UInt64, max_rows_to_sort, 0, "If more than specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_bytes_to_sort, 0, "If more than specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(OverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, max_bytes_before_external_sort, 0, "", 0) \
M(UInt64, max_bytes_before_external_sort, 0, "If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \
M(Float, remerge_sort_lowered_memory_bytes_ratio, 2., "If memory usage after remerge does not reduced by this ratio, remerge will be disabled.", 0) \
\
@ -332,7 +332,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(OverflowMode, result_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
/* TODO: Check also when merging and finalizing aggregate functions. */ \
M(Seconds, max_execution_time, 0, "", 0) \
M(Seconds, max_execution_time, 0, "If query run time exceeded the specified number of seconds, the behavior will be determined by the 'timeout_overflow_mode' which by default is - throw an exception. Note that the timeout is checked and query can stop only in designated places during data processing. It currently cannot stop during merging of aggregation states or during query analysis, and the actual run time will be higher than the value of this setting.", 0) \
M(OverflowMode, timeout_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, min_execution_speed, 0, "Minimum number of execution rows per second.", 0) \
@ -341,12 +341,12 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_execution_speed_bytes, 0, "Maximum number of execution bytes per second.", 0) \
M(Seconds, timeout_before_checking_execution_speed, 10, "Check that the speed is not too low after the specified time has elapsed.", 0) \
\
M(UInt64, max_columns_to_read, 0, "", 0) \
M(UInt64, max_temporary_columns, 0, "", 0) \
M(UInt64, max_temporary_non_const_columns, 0, "", 0) \
M(UInt64, max_columns_to_read, 0, "If a query requires reading more than specified number of columns, exception is thrown. Zero value means unlimited. This setting is useful to prevent too complex queries.", 0) \
M(UInt64, max_temporary_columns, 0, "If a query generates more than the specified number of temporary columns in memory as a result of intermediate calculation, exception is thrown. Zero value means unlimited. This setting is useful to prevent too complex queries.", 0) \
M(UInt64, max_temporary_non_const_columns, 0, "Similar to the 'max_temporary_columns' setting but applies only to non-constant columns. This makes sense, because constant columns are cheap and it is reasonable to allow more of them.", 0) \
\
M(UInt64, max_subquery_depth, 100, "", 0) \
M(UInt64, max_pipeline_depth, 1000, "", 0) \
M(UInt64, max_subquery_depth, 100, "If a query has more than specified number of nested subqueries, throw an exception. This allows you to have a sanity check to protect the users of your cluster from going insane with their queries.", 0) \
M(UInt64, max_pipeline_depth, 1000, "If a query has more than specified stages in the query pipeline, throw an exception. Pipeline has stages for every relational operator. This allows to limit the complexity of the queries.", 0) \
M(UInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.", 0) \
M(UInt64, max_ast_elements, 50000, "Maximum size of query syntax tree in number of nodes. Checked after parsing.", 0) \
M(UInt64, max_expanded_ast_elements, 500000, "Maximum size of query syntax tree in number of nodes after expansion of aliases and the asterisk.", 0) \
@ -592,7 +592,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, enable_filesystem_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must be done via disk config), but allows to bypass cache for some queries if intended", 0) \
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.", 0) \
M(Bool, enable_filesystem_cache_on_lower_level, true, "If read buffer supports caching inside threadpool, allow it to do it, otherwise cache outside ot threadpool. Do not use this setting, it is needed for testing", 0) \
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, max_query_cache_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be used by a single query", 0) \

View File

@ -35,9 +35,9 @@ static DataTypePtr convertSQLiteDataType(String type)
res = std::make_shared<DataTypeInt8>();
else if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
else if (type.starts_with("int") || type == "mediumint")
else if ((type.starts_with("int") && type != "int8") || type == "mediumint")
res = std::make_shared<DataTypeInt32>();
else if (type == "bigint")
else if (type == "bigint" || type == "int8")
res = std::make_shared<DataTypeInt64>();
else if (type == "float")
res = std::make_shared<DataTypeFloat32>();

View File

@ -1,9 +1,8 @@
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Poco/TemporaryFile.h>
#include <ranges>
#include <filesystem>

View File

@ -1,17 +1,39 @@
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IDisk.h>
#include <Poco/TemporaryFile.h>
#include <Common/CurrentMetrics.h>
namespace ProfileEvents
{
extern const Event ExternalProcessingFilesTotal;
}
namespace DB
{
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_)
: TemporaryFileOnDisk(disk_, disk_->getPath())
{}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope)
: TemporaryFileOnDisk(disk_)
{
sub_metric_increment.emplace(metric_scope);
}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_)
: disk(disk_)
{
/// is is possible to use with disk other than DickLocal ?
disk->createDirectories(prefix_);
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
/// Do not use default temporaty root path `/tmp/tmpXXXXXX`.
/// The `dummy_prefix` is used to know what to replace with the real prefix.
String dummy_prefix = "a/";
filepath = Poco::TemporaryFile::tempName(dummy_prefix);
dummy_prefix += "tmp";
/// a/tmpXXXXX -> <prefix>XXXXX
assert(filepath.starts_with(dummy_prefix));
filepath.replace(0, dummy_prefix.length(), prefix_);
}

View File

@ -2,28 +2,46 @@
#include <Core/Types.h>
#include <memory>
#include <Disks/IDisk.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric TotalTemporaryFiles;
}
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// This class helps with the handling of temporary files or directories.
/// A unique name for the temporary file or directory is automatically chosen based on a specified prefix.
/// Optionally can create a directory in the constructor.
/// Create a directory in the constructor.
/// The destructor always removes the temporary file or directory with all contained files.
class TemporaryFileOnDisk
{
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_ = "tmp");
explicit TemporaryFileOnDisk(const DiskPtr & disk_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_);
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
const String & getPath() const { return filepath; }
const String & path() const { return filepath; }
private:
DiskPtr disk;
String filepath;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TotalTemporaryFiles};
/// Specified if we know what for file is used (sort/aggregate/join).
std::optional<CurrentMetrics::Increment> sub_metric_increment = {};
};
using TemporaryFileOnDiskHolder = std::unique_ptr<TemporaryFileOnDisk>;
}

View File

@ -25,7 +25,7 @@ TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block &
{}
/// Flush data from input stream into file for future reading
void TemporaryFileStream::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
TemporaryFileStream::Stat TemporaryFileStream::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
@ -39,6 +39,7 @@ void TemporaryFileStream::write(const std::string & path, const Block & header,
output.write(block);
compressed_buf.finalize();
return Stat{compressed_buf.getCompressedBytes(), compressed_buf.getUncompressedBytes()};
}
}

View File

@ -12,6 +12,12 @@ namespace DB
/// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream
{
struct Stat
{
size_t compressed_bytes = 0;
size_t uncompressed_bytes = 0;
};
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
std::unique_ptr<NativeReader> block_in;
@ -20,7 +26,7 @@ struct TemporaryFileStream
TemporaryFileStream(const std::string & path, const Block & header_);
/// Flush data from input stream into file for future reading
static void write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec);
static Stat write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec);
};
}

View File

@ -48,7 +48,7 @@ struct Base58Encode
for (size_t row = 0; row < input_rows_count; ++row)
{
size_t srclen = src_offsets[row] - src_offset_prev;
auto encoded_size = encodeBase58(src, dst_pos);
auto encoded_size = encodeBase58(src, srclen, dst_pos);
src += srclen;
dst_pos += encoded_size;
@ -90,7 +90,7 @@ struct Base58Decode
{
size_t srclen = src_offsets[row] - src_offset_prev;
auto decoded_size = decodeBase58(src, dst_pos);
auto decoded_size = decodeBase58(src, srclen, dst_pos);
if (!decoded_size)
throw Exception("Invalid Base58 value, cannot be decoded", ErrorCodes::BAD_ARGUMENTS);

View File

@ -43,7 +43,7 @@ public:
size_t getNumberOfArguments() const override { return 2; }
bool isInjective(const ColumnsWithTypeAndName &) const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{

View File

@ -23,12 +23,14 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/JSONBuilder.h>
#include <Common/filesystemHelpers.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <IO/Operators.h>
#include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Parsers/ASTSelectQuery.h>
@ -37,6 +39,8 @@ namespace ProfileEvents
extern const Event ExternalAggregationWritePart;
extern const Event ExternalAggregationCompressedBytes;
extern const Event ExternalAggregationUncompressedBytes;
extern const Event ExternalProcessingCompressedBytesTotal;
extern const Event ExternalProcessingUncompressedBytesTotal;
extern const Event AggregationPreallocatedElementsInHashTables;
extern const Event AggregationHashTablesInitializedAsTwoLevel;
extern const Event OverflowThrow;
@ -44,6 +48,11 @@ namespace ProfileEvents
extern const Event OverflowAny;
}
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForAggregation;
}
namespace
{
/** Collects observed HashMap-s sizes to avoid redundant intermediate resizes.
@ -1469,40 +1478,26 @@ bool Aggregator::executeOnBlock(Columns columns,
&& worth_convert_to_two_level)
{
size_t size = current_memory_usage + params.min_free_disk_space;
std::string tmp_path = params.tmp_volume->getDisk()->getPath();
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
// space.
//
// But true reservation (IVolume::reserve()) cannot be used here since
// current_memory_usage does not takes compression into account and
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (!enoughSpaceInDirectory(tmp_path, size))
throw Exception("Not enough space for external aggregation in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
writeToTemporaryFile(result, tmp_path);
writeToTemporaryFile(result, size);
}
return true;
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size) const
{
Stopwatch watch;
size_t rows = data_variants.size();
auto file = createTemporaryFile(tmp_path);
const std::string & path = file->path();
auto file = createTempFile(max_temp_file_size);
const auto & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}.", path);
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", path);
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
/// Flush only two-level data and possibly overflow data.
@ -1545,6 +1540,8 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, uncompressed_bytes);
LOG_DEBUG(log,
"Written part in {:.3f} sec., {} rows, {} uncompressed, {} compressed,"
@ -1563,10 +1560,22 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const
TemporaryFileOnDiskHolder Aggregator::createTempFile(size_t max_temp_file_size) const
{
String tmp_path = params.tmp_volume->getDisk()->getPath();
return writeToTemporaryFile(data_variants, tmp_path);
auto file = std::make_unique<TemporaryFileOnDisk>(params.tmp_volume->getDisk(), CurrentMetrics::TemporaryFilesForAggregation);
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
// space.
//
// But true reservation (IVolume::reserve()) cannot be used here since
// current_memory_usage does not takes compression into account and
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (max_temp_file_size > 0 && !enoughSpaceInDirectory(file->getPath(), max_temp_file_size))
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path());
return file;
}
@ -2831,22 +2840,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
&& worth_convert_to_two_level)
{
size_t size = current_memory_usage + params.min_free_disk_space;
std::string tmp_path = params.tmp_volume->getDisk()->getPath();
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
// space.
//
// But true reservation (IVolume::reserve()) cannot be used here since
// current_memory_usage does not takes compression into account and
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (!enoughSpaceInDirectory(tmp_path, size))
throw Exception("Not enough space for external aggregation in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
writeToTemporaryFile(result, tmp_path);
writeToTemporaryFile(result, size);
}
return true;

View File

@ -23,6 +23,7 @@
#include <QueryPipeline/SizeLimits.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/AggregationCommon.h>
@ -1058,14 +1059,15 @@ public:
std::vector<Block> convertBlockToTwoLevel(const Block & block) const;
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const;
TemporaryFileOnDiskHolder createTempFile(size_t max_temp_file_size) const;
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
struct TemporaryFiles
{
std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
std::vector<TemporaryFileOnDiskHolder> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
mutable std::mutex mutex;

View File

@ -664,8 +664,8 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
{
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Policy '{} is used temporary files, such policy should have exactly one volume", policy_name);
shared->tmp_volume = tmp_policy->getVolume(0);
}

View File

@ -2198,7 +2198,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
query_info.syntax_analyzer_result);
}
}
else
else if (optimize_aggregation_in_order)
{
if (query_info.projection)
{

View File

@ -7,27 +7,49 @@
#include <Processors/Sources/TemporaryFileLazySource.h>
#include <Formats/TemporaryFileStream.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
namespace ProfileEvents
{
extern const Event ExternalJoinWritePart;
extern const Event ExternalJoinMerge;
extern const Event ExternalJoinCompressedBytes;
extern const Event ExternalJoinUncompressedBytes;
extern const Event ExternalProcessingCompressedBytesTotal;
extern const Event ExternalProcessingUncompressedBytesTotal;
}
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForJoin;
}
namespace DB
{
namespace
{
std::unique_ptr<TemporaryFile> flushToFile(const String & tmp_path, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
{
auto tmp_file = createTemporaryFile(tmp_path);
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, CurrentMetrics::TemporaryFilesForJoin);
auto write_stat = TemporaryFileStream::write(tmp_file->getPath(), header, std::move(pipeline), codec);
TemporaryFileStream::write(tmp_file->path(), header, std::move(pipeline), codec);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalJoinCompressedBytes, write_stat.compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalJoinUncompressedBytes, write_stat.uncompressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalJoinWritePart);
return tmp_file;
}
SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, QueryPipelineBuilder builder,
SortedBlocksWriter::SortedFiles flushToManyFiles(const DiskPtr & disk, const Block & header, QueryPipelineBuilder builder,
const String & codec, std::function<void(const Block &)> callback = [](const Block &){})
{
std::vector<std::unique_ptr<TemporaryFile>> files;
std::vector<TemporaryFileOnDiskHolder> files;
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);
@ -42,7 +64,7 @@ SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const
QueryPipelineBuilder one_block_pipeline;
Chunk chunk(block.getColumns(), block.rows());
one_block_pipeline.init(Pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk))));
auto tmp_file = flushToFile(tmp_path, header, std::move(one_block_pipeline), codec);
auto tmp_file = flushToFile(disk, header, std::move(one_block_pipeline), codec);
files.emplace_back(std::move(tmp_file));
}
@ -116,8 +138,6 @@ void SortedBlocksWriter::insert(Block && block)
SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & blocks) const
{
const std::string path = getPath();
Pipes pipes;
pipes.reserve(blocks.size());
for (const auto & block : blocks)
@ -142,7 +162,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipeline.addTransform(std::move(transform));
}
return flushToFile(path, sample_block, std::move(pipeline), codec);
return flushToFile(volume->getDisk(), sample_block, std::move(pipeline), codec);
}
SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
@ -197,7 +217,7 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
pipeline.addTransform(std::move(transform));
}
new_files.emplace_back(flushToFile(getPath(), sample_block, std::move(pipeline), codec));
new_files.emplace_back(flushToFile(volume->getDisk(), sample_block, std::move(pipeline), codec));
}
}
@ -220,6 +240,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
if (pipeline.getNumStreams() > 1)
{
ProfileEvents::increment(ProfileEvents::ExternalJoinMerge);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
@ -230,7 +251,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
pipeline.addTransform(std::move(transform));
}
return flushToManyFiles(getPath(), sample_block, std::move(pipeline), codec, callback);
return flushToManyFiles(volume->getDisk(), sample_block, std::move(pipeline), codec, callback);
}
Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
@ -238,11 +259,6 @@ Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
return Pipe(std::make_shared<TemporaryFileLazySource>(file->path(), materializeBlock(sample_block)));
}
String SortedBlocksWriter::getPath() const
{
return volume->getDisk()->getPath();
}
Block SortedBlocksBuffer::exchange(Block && block)
{

View File

@ -8,7 +8,7 @@
#include <Core/SortDescription.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/TemporaryFileOnDisk.h>
namespace DB
{
@ -24,7 +24,7 @@ using VolumePtr = std::shared_ptr<IVolume>;
struct SortedBlocksWriter
{
using TmpFilePtr = std::unique_ptr<TemporaryFile>;
using TmpFilePtr = TemporaryFileOnDiskHolder;
using SortedFiles = std::vector<TmpFilePtr>;
struct Blocks

View File

@ -2,6 +2,7 @@
#include <Processors/IAccumulatingTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Common/ProfileEvents.h>
#include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
@ -15,8 +16,16 @@ namespace ProfileEvents
{
extern const Event ExternalSortWritePart;
extern const Event ExternalSortMerge;
extern const Event ExternalSortCompressedBytes;
extern const Event ExternalSortUncompressedBytes;
extern const Event ExternalProcessingCompressedBytesTotal;
extern const Event ExternalProcessingUncompressedBytesTotal;
}
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForSort;
}
namespace DB
{
@ -50,9 +59,14 @@ public:
{
if (out_stream)
{
out_stream->flush();
compressed_buf_out.next();
file_buf_out.next();
LOG_INFO(log, "Done writing part of data into temporary file {}", path);
auto stat = updateWriteStat();
LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ",
path, ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
out_stream.reset();
@ -76,6 +90,24 @@ public:
}
private:
struct Stat
{
size_t compressed_size = 0;
size_t uncompressed_size = 0;
};
Stat updateWriteStat()
{
Stat res{compressed_buf_out.getCompressedBytes(), compressed_buf_out.getUncompressedBytes()};
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, res.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, res.uncompressed_size);
return res;
}
Poco::Logger * log;
std::string path;
WriteBufferFromFile file_buf_out;
@ -182,8 +214,7 @@ void MergeSortingTransform::consume(Chunk chunk)
if (!reservation)
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
const std::string tmp_path(reservation->getDisk()->getPath());
temporary_files.emplace_back(createTemporaryFile(tmp_path));
temporary_files.emplace_back(std::make_unique<TemporaryFileOnDisk>(reservation->getDisk(), CurrentMetrics::TemporaryFilesForSort));
const std::string & path = temporary_files.back()->path();
merge_sorter
@ -236,7 +267,7 @@ void MergeSortingTransform::generate()
else
{
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
LOG_INFO(log, "There are {} temporary sorted parts to merge.", temporary_files.size());
LOG_INFO(log, "There are {} temporary sorted parts to merge", temporary_files.size());
processors.emplace_back(std::make_shared<MergeSorterSource>(
header_without_constants, std::move(chunks), description, max_merged_block_size, limit));

View File

@ -3,6 +3,7 @@
#include <Processors/Transforms/SortingTransform.h>
#include <Core/SortDescription.h>
#include <Common/filesystemHelpers.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Common/logger_useful.h>
@ -55,7 +56,7 @@ private:
bool remerge_is_useful = true;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
std::vector<TemporaryFileOnDiskHolder> temporary_files;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();

View File

@ -19,7 +19,6 @@
#include <DataTypes/ObjectUtils.h>
#include <Columns/ColumnObject.h>
#include <DataTypes/hasNullable.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
@ -63,6 +62,7 @@
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>

View File

@ -1078,7 +1078,7 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
}
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true);
configuration.format = FormatFactory::instance().getFormatFromFileName(Poco::URI(configuration.url).getPath(), true);
return configuration;
}

View File

@ -54,7 +54,7 @@ void TableFunctionURL::parseArguments(const ASTPtr & ast_function, ContextPtr co
filename = configuration.url;
format = configuration.format;
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(filename, true);
format = FormatFactory::instance().getFormatFromFileName(Poco::URI(filename).getPath(), true);
structure = configuration.structure;
compression_method = configuration.compression_method;
}
@ -118,6 +118,11 @@ ColumnsDescription TableFunctionURL::getActualTableStructure(ContextPtr context)
return parseColumnsListFromString(structure, context);
}
String TableFunctionURL::getFormatFromFirstArgument()
{
return FormatFactory::instance().getFormatFromFileName(Poco::URI(filename).getPath(), true);
}
void registerTableFunctionURL(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionURL>();

View File

@ -32,6 +32,8 @@ private:
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "URL"; }
String getFormatFromFirstArgument() override;
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders() const;
URLBasedDataSourceConfiguration configuration;

View File

@ -7,6 +7,12 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<s3_cache>
<type>cache</type>
<disk>s3</disk>
<path>./s3_cache/</path>
<max_size>1000000</max_size>
</s3_cache>
</disks>
<policies>
<s3>
@ -16,6 +22,13 @@
</main>
</volumes>
</s3>
<s3_cache>
<volumes>
<main>
<disk>s3_cache</disk>
</main>
</volumes>
</s3_cache>
</policies>
</storage_configuration>

View File

@ -5,6 +5,8 @@ import string
import pytest
from helpers.cluster import ClickHouseCluster
TABLE_NAME = "s3_test"
@pytest.fixture(scope="module")
def cluster():
@ -58,8 +60,13 @@ def generate_values(date_str, count, sign=1):
def create_table(cluster, additional_settings=None):
create_table_statement = """
CREATE TABLE s3_test ON CLUSTER cluster(
settings = {
"storage_policy": "s3",
}
settings.update(additional_settings)
create_table_statement = f"""
CREATE TABLE {TABLE_NAME} ON CLUSTER cluster(
dt Date,
id Int64,
data String,
@ -67,20 +74,42 @@ def create_table(cluster, additional_settings=None):
) ENGINE=ReplicatedMergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS storage_policy='s3'
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
list(cluster.instances.values())[0].query(create_table_statement)
def insert(cluster, node_idxs, verify=True):
all_values = ""
for node_idx in node_idxs:
node = cluster.instances["node" + str(node_idx)]
values = generate_values("2020-01-0" + str(node_idx), 4096)
node.query(
f"INSERT INTO {TABLE_NAME} VALUES {values}",
settings={"insert_quorum": 3},
)
if node_idx != 1:
all_values += ","
all_values += values
if verify:
for node_idx in node_idxs:
node = cluster.instances["node" + str(node_idx)]
assert (
node.query(
f"SELECT * FROM {TABLE_NAME} order by dt, id FORMAT Values",
settings={"select_sequential_consistency": 1},
)
== all_values
)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
for node in list(cluster.instances.values()):
node.query("DROP TABLE IF EXISTS s3_test")
node.query(f"DROP TABLE IF EXISTS {TABLE_NAME}")
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
@ -95,32 +124,39 @@ def drop_table(cluster):
def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_part):
create_table(
cluster,
additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part),
additional_settings={"min_rows_for_wide_part": min_rows_for_wide_part},
)
all_values = ""
for node_idx in range(1, 4):
node = cluster.instances["node" + str(node_idx)]
values = generate_values("2020-01-0" + str(node_idx), 4096)
node.query(
"INSERT INTO s3_test VALUES {}".format(values),
settings={"insert_quorum": 3},
)
if node_idx != 1:
all_values += ","
all_values += values
for node_idx in range(1, 4):
node = cluster.instances["node" + str(node_idx)]
assert (
node.query(
"SELECT * FROM s3_test order by dt, id FORMAT Values",
settings={"select_sequential_consistency": 1},
)
== all_values
)
insert(cluster, node_idxs=[1, 2, 3], verify=True)
minio = cluster.minio_client
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == 3 * (
FILES_OVERHEAD + files_per_part * 3
)
def test_drop_cache_on_cluster(cluster):
create_table(
cluster,
additional_settings={"storage_policy": "s3_cache"},
)
insert(cluster, node_idxs=[1, 2, 3], verify=True)
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node3 = cluster.instances["node3"]
node1.query(
f"select * from clusterAllReplicas(cluster, default, {TABLE_NAME}) format Null"
)
assert int(node1.query("select count() from system.filesystem_cache")) > 0
assert int(node2.query("select count() from system.filesystem_cache")) > 0
assert int(node3.query("select count() from system.filesystem_cache")) > 0
node1.query("system drop filesystem cache on cluster cluster")
assert int(node1.query("select count() from system.filesystem_cache")) == 0
assert int(node2.query("select count() from system.filesystem_cache")) == 0
assert int(node3.query("select count() from system.filesystem_cache")) == 0

View File

@ -51,7 +51,7 @@ function create_table()
database=$($CLICKHOUSE_CLIENT -q "select name from system.databases where name like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$database" ]; then return; fi
$CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \
"create table $database.rmt_$RANDOM (n int) engine=ReplicatedMergeTree order by tuple() -- suppress $CLICKHOUSE_TEST_ZOOKEEPER_PREFIX" \
"create table $database.rmt_${RANDOM}_${RANDOM}_${RANDOM} (n int) engine=ReplicatedMergeTree order by tuple() -- suppress $CLICKHOUSE_TEST_ZOOKEEPER_PREFIX" \
2>&1| grep -Fa "Exception: " | grep -Fv "Macro 'uuid' and empty arguments" | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE
sleep 0.$RANDOM
done

View File

@ -1,4 +1,4 @@
send_timeout 300 0 \N \N 0 Seconds
send_timeout 300 0 Timeout for sending data to network, in seconds. If client needs to sent some data, but it did not able to send any bytes in this interval, exception is thrown. If you set this setting on client, the \'receive_timeout\' for the socket will be also send on the corresponding connection end on the server. \N \N 0 Seconds
storage_policy default 0 Name of storage disk policy String
1
1

View File

@ -21,7 +21,7 @@ line3 3
2 text2
3 text3
test types
CREATE TABLE SQLite.table4\n(\n `a` Nullable(Int32),\n `b` Nullable(Int32),\n `c` Nullable(Int8),\n `d` Nullable(Int16),\n `e` Nullable(Int32),\n `bigint` Nullable(String),\n `int2` Nullable(String),\n `int8` Nullable(String)\n)\nENGINE = SQLite
CREATE TABLE SQLite.table4\n(\n `a` Nullable(Int32),\n `b` Nullable(Int32),\n `c` Nullable(Int8),\n `d` Nullable(Int16),\n `e` Nullable(Int32),\n `f` Nullable(Int64),\n `g` Nullable(Int32),\n `h` Nullable(Int64)\n)\nENGINE = SQLite
CREATE TABLE SQLite.table5\n(\n `a` Nullable(String),\n `b` Nullable(String),\n `c` Nullable(Float64),\n `d` Nullable(Float64),\n `e` Nullable(Float64),\n `f` Nullable(Float32)\n)\nENGINE = SQLite
create table engine with table3
CREATE TABLE default.sqlite_table3\n(\n `col1` String,\n `col2` Int32\n)\nENGINE = SQLite

View File

@ -42,7 +42,7 @@ sqlite3 "${DB_PATH}" "INSERT INTO table3 VALUES ('not a null', 2)"
sqlite3 "${DB_PATH}" 'INSERT INTO table3 VALUES (NULL, 3)'
sqlite3 "${DB_PATH}" "INSERT INTO table3 VALUES ('', 4)"
sqlite3 "${DB_PATH}" 'CREATE TABLE table4 (a int, b integer, c tinyint, d smallint, e mediumint, bigint, int2, int8)'
sqlite3 "${DB_PATH}" 'CREATE TABLE table4 (a int, b integer, c tinyint, d smallint, e mediumint, f bigint, g int2, h int8)'
sqlite3 "${DB_PATH}" 'CREATE TABLE table5 (a character(20), b varchar(10), c real, d double, e double precision, f float)'

View File

@ -21,3 +21,6 @@ foo
foob
fooba
foobar
1
1

View File

@ -9,4 +9,7 @@ SELECT base58Decode('Hold my beer...'); -- { serverError 36 }
SELECT base58Decode(encoded) FROM (SELECT base58Encode(val) as encoded FROM (select arrayJoin(['', 'f', 'fo', 'foo', 'foob', 'fooba', 'foobar', 'Hello world!']) val));
SELECT base58Encode(val) FROM (select arrayJoin(['', 'f', 'fo', 'foo', 'foob', 'fooba', 'foobar']) val);
SELECT base58Decode(val) FROM (select arrayJoin(['', '2m', '8o8', 'bQbp', '3csAg9', 'CZJRhmz', 't1Zv2yaZ']) val);
SELECT base58Decode(val) FROM (select arrayJoin(['', '2m', '8o8', 'bQbp', '3csAg9', 'CZJRhmz', 't1Zv2yaZ', '']) val);
SELECT base58Encode(base58Decode('1BWutmTvYPwDtmw9abTkS4Ssr8no61spGAvW1X6NDix')) == '1BWutmTvYPwDtmw9abTkS4Ssr8no61spGAvW1X6NDix';
select base58Encode('\x00\x0b\xe3\xe1\xeb\xa1\x7a\x47\x3f\x89\xb0\xf7\xe8\xe2\x49\x40\xf2\x0a\xeb\x8e\xbc\xa7\x1a\x88\xfd\xe9\x5d\x4b\x83\xb7\x1a\x09') == '1BWutmTvYPwDtmw9abTkS4Ssr8no61spGAvW1X6NDix';

View File

@ -0,0 +1 @@
SELECT name FROM system.settings WHERE length(description) < 10;

View File

@ -0,0 +1 @@
SELECT name FROM system.merge_tree_settings WHERE length(description) < 10;

View File

@ -0,0 +1,3 @@
1
1
1

View File

@ -0,0 +1,78 @@
-- Tags: no-parallel, no-fasttest, long, no-random-settings
SET max_bytes_before_external_sort = 33554432;
set max_block_size = 1048576;
SELECT number FROM (SELECT number FROM numbers(2097152)) ORDER BY number * 1234567890123456789 LIMIT 2097142, 10
SETTINGS log_comment='02402_external_disk_mertrics/sort'
FORMAT Null;
SET max_bytes_before_external_group_by = '100M';
SET max_memory_usage = '410M';
SET group_by_two_level_threshold = '100K';
SET group_by_two_level_threshold_bytes = '50M';
SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 2097152) GROUP BY k)
SETTINGS log_comment='02402_external_disk_mertrics/aggregation'
FORMAT Null;
SET join_algorithm = 'partial_merge';
SET default_max_bytes_in_join = 0;
SET max_bytes_in_join = 10000000;
SELECT number * 200000 as n, j * 2097152 FROM numbers(5) nums
ANY LEFT JOIN ( SELECT number * 2 AS n, number AS j FROM numbers(1000000) ) js2
USING n
ORDER BY n
SETTINGS log_comment='02402_external_disk_mertrics/join'
FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT
any(ProfileEvents['ExternalProcessingFilesTotal']) >= 1 AND
any(ProfileEvents['ExternalProcessingCompressedBytesTotal']) >= 100000 AND
any(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) >= 100000 AND
any(ProfileEvents['ExternalSortWritePart']) >= 1 AND
any(ProfileEvents['ExternalSortMerge']) >= 1 AND
any(ProfileEvents['ExternalSortCompressedBytes']) >= 100000 AND
any(ProfileEvents['ExternalSortUncompressedBytes']) >= 100000 AND
count() == 1
FROM system.query_log WHERE current_database = currentDatabase()
AND log_comment = '02402_external_disk_mertrics/sort'
AND query ILIKE 'SELECT%2097152%' AND type = 'QueryFinish';
SELECT
any(ProfileEvents['ExternalProcessingFilesTotal']) >= 1 AND
any(ProfileEvents['ExternalProcessingCompressedBytesTotal']) >= 100000 AND
any(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) >= 100000 AND
any(ProfileEvents['ExternalAggregationWritePart']) >= 1 AND
any(ProfileEvents['ExternalAggregationMerge']) >= 1 AND
any(ProfileEvents['ExternalAggregationCompressedBytes']) >= 100000 AND
any(ProfileEvents['ExternalAggregationUncompressedBytes']) >= 100000 AND
count() == 1
FROM system.query_log WHERE current_database = currentDatabase()
AND log_comment = '02402_external_disk_mertrics/aggregation'
AND query ILIKE 'SELECT%2097152%' AND type = 'QueryFinish';
SELECT
any(ProfileEvents['ExternalProcessingFilesTotal']) >= 1 AND
any(ProfileEvents['ExternalProcessingCompressedBytesTotal']) >= 100000 AND
any(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) >= 100000 AND
any(ProfileEvents['ExternalJoinWritePart']) >= 1 AND
any(ProfileEvents['ExternalJoinMerge']) >= 0 AND
any(ProfileEvents['ExternalJoinCompressedBytes']) >= 100000 AND
any(ProfileEvents['ExternalJoinUncompressedBytes']) >= 100000 AND
count() == 1
FROM system.query_log WHERE current_database = currentDatabase()
AND log_comment = '02402_external_disk_mertrics/join'
AND query ILIKE 'SELECT%2097152%' AND type = 'QueryFinish';
-- Do not check values because they can be not recorded, just existence
SELECT
CurrentMetric_TemporaryFilesForAggregation,
CurrentMetric_TemporaryFilesForJoin,
CurrentMetric_TemporaryFilesForSort
FROM system.metric_log
ORDER BY event_time DESC LIMIT 5
FORMAT Null;

View File

@ -0,0 +1,2 @@
\N
\N

View File

@ -0,0 +1 @@
select if(number < 0, toFixedString(materialize('123'), 2), NULL) from numbers(2);

View File

@ -0,0 +1 @@
x UInt32

View File

@ -0,0 +1 @@
desc url('http://localhost:8888/test/data.tsv?get=parameterHere', auto, 'x UInt32');

View File

@ -86,6 +86,7 @@ EXTERN_TYPES_EXCLUDES=(
CurrentMetrics::Increment
CurrentMetrics::Metric
CurrentMetrics::values
CurrentMetrics::Value
)
for extern_type in ${!EXTERN_TYPES[@]}; do
type_of_extern=${EXTERN_TYPES[$extern_type]}