Merge branch 'master' into fix-fire-hop-window

This commit is contained in:
mergify[bot] 2022-05-05 13:11:34 +00:00 committed by GitHub
commit eba26ec956
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
336 changed files with 5209 additions and 1925 deletions

View File

@ -1,4 +1,6 @@
Checks: '-*,
misc-misleading-bidirectional,
misc-misleading-identifier,
misc-misplaced-const,
misc-redundant-expression,
misc-static-assert,
@ -11,15 +13,19 @@ Checks: '-*,
modernize-avoid-bind,
modernize-loop-convert,
modernize-macro-to-enum,
modernize-make-shared,
modernize-make-unique,
modernize-raw-string-literal,
modernize-redundant-void-arg,
modernize-replace-random-shuffle,
modernize-shrink-to-fit,
modernize-use-bool-literals,
modernize-use-equals-default,
modernize-use-equals-delete,
modernize-use-nullptr,
modernize-use-transparent-functors,
modernize-use-uncaught-exceptions,
modernize-use-using,
performance-faster-string-find,
@ -41,6 +47,7 @@ Checks: '-*,
readability-convert-member-functions-to-static,
readability-delete-null-pointer,
readability-deleted-default,
readability-duplicate-include,
readability-identifier-naming,
readability-inconsistent-declaration-parameter-name,
readability-make-member-function-const,
@ -51,6 +58,7 @@ Checks: '-*,
readability-redundant-control-flow,
readability-redundant-function-ptr-dereference,
readability-redundant-member-init,
readability-redundant-preprocessor,
readability-redundant-smartptr-get,
readability-redundant-string-cstr,
readability-redundant-string-init,
@ -71,6 +79,7 @@ Checks: '-*,
bugprone-incorrect-roundings,
bugprone-infinite-loop,
bugprone-integer-division,
bugprone-lambda-function-name,
bugprone-macro-parentheses,
bugprone-macro-repeated-side-effects,
bugprone-misplaced-operator-in-strlen-in-alloc,
@ -80,15 +89,19 @@ Checks: '-*,
bugprone-multiple-statement-macro,
bugprone-parent-virtual-call,
bugprone-posix-return,
bugprone-redundant-branch-condition,
bugprone-reserved-identifier,
bugprone-shared-ptr-array-mismatch,
bugprone-signed-char-misuse,
bugprone-sizeof-container,
bugprone-sizeof-expression,
bugprone-string-constructor,
bugprone-string-integer-assignment,
bugprone-string-literal-with-embedded-nul,
bugprone-stringview-nullptr,
bugprone-suspicious-enum-usage,
bugprone-suspicious-include,
bugprone-suspicious-memory-comparison,
bugprone-suspicious-memset-usage,
bugprone-suspicious-missing-comma,
bugprone-suspicious-string-compare,

View File

@ -25,7 +25,6 @@
* Introduce format `ProtobufList` (all records as repeated messages in out Protobuf). Closes [#16436](https://github.com/ClickHouse/ClickHouse/issues/16436). [#35152](https://github.com/ClickHouse/ClickHouse/pull/35152) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Add `h3PointDistM`, `h3PointDistKm`, `h3PointDistRads`, `h3GetRes0Indexes`, `h3GetPentagonIndexes` functions. [#34568](https://github.com/ClickHouse/ClickHouse/pull/34568) ([Bharat Nallan](https://github.com/bharatnc)).
* Add `toLastDayOfMonth` function which rounds up a date or date with time to the last day of the month. [#33501](https://github.com/ClickHouse/ClickHouse/issues/33501). [#34394](https://github.com/ClickHouse/ClickHouse/pull/34394) ([Habibullah Oladepo](https://github.com/holadepo)).
* New aggregation function groupSortedArray to obtain an array of first N values. [#34055](https://github.com/ClickHouse/ClickHouse/pull/34055) ([palegre-tiny](https://github.com/palegre-tiny)).
* Added load balancing setting for \[Zoo\]Keeper client. Closes [#29617](https://github.com/ClickHouse/ClickHouse/issues/29617). [#30325](https://github.com/ClickHouse/ClickHouse/pull/30325) ([小路](https://github.com/nicelulu)).
* Add a new kind of row policies named `simple`. Before this PR we had two kinds or row policies: `permissive` and `restrictive`. A `simple` row policy adds a new filter on a table without any side-effects like it was for permissive and restrictive policies. [#35345](https://github.com/ClickHouse/ClickHouse/pull/35345) ([Vitaly Baranov](https://github.com/vitlibar)).
* Added an ability to specify cluster secret in replicated database. [#35333](https://github.com/ClickHouse/ClickHouse/pull/35333) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).

View File

@ -20,7 +20,9 @@ if (NOT DEFINED ENV{CLION_IDE} AND NOT DEFINED ENV{XCODE_IDE})
endif()
# Check if environment is polluted.
if (DEFINED ENV{CFLAGS} OR DEFINED ENV{CXXFLAGS} OR DEFINED ENV{LDFLAGS}
if (NOT $ENV{CFLAGS} STREQUAL ""
OR NOT $ENV{CXXFLAGS} STREQUAL ""
OR NOT $ENV{LDFLAGS} STREQUAL ""
OR CMAKE_C_FLAGS OR CMAKE_CXX_FLAGS OR CMAKE_EXE_LINKER_FLAGS OR CMAKE_SHARED_LINKER_FLAGS OR CMAKE_MODULE_LINKER_FLAGS
OR CMAKE_C_FLAGS_INIT OR CMAKE_CXX_FLAGS_INIT OR CMAKE_EXE_LINKER_FLAGS_INIT OR CMAKE_SHARED_LINKER_FLAGS_INIT OR CMAKE_MODULE_LINKER_FLAGS_INIT)

View File

@ -1,36 +0,0 @@
#pragma once
#include <memory>
/** Allows to make std::shared_ptr from T with protected constructor.
*
* Derive your T class from shared_ptr_helper<T> and add shared_ptr_helper<T> as a friend
* and you will have static 'create' method in your class.
*/
template <typename T>
struct shared_ptr_helper
{
template <typename... TArgs>
static std::shared_ptr<T> create(TArgs &&... args)
{
return std::shared_ptr<T>(new T(std::forward<TArgs>(args)...));
}
};
template <typename T>
struct is_shared_ptr
{
static constexpr bool value = false;
};
template <typename T>
struct is_shared_ptr<std::shared_ptr<T>>
{
static constexpr bool value = true;
};
template <typename T>
inline constexpr bool is_shared_ptr_v = is_shared_ptr<T>::value;

2
contrib/minizip-ng vendored

@ -1 +1 @@
Subproject commit 6cffc951851620e0fac1993be75e4713c334de03
Subproject commit f3d400e999056ca290998b3fd89cc5a74e4b8b58

View File

@ -652,12 +652,16 @@ if args.report == "main":
# Don't show mildly unstable queries, only the very unstable ones we
# treat as errors.
if very_unstable_queries:
if very_unstable_queries > 5:
error_tests += very_unstable_queries
status = "failure"
message_array.append(str(very_unstable_queries) + " unstable")
# FIXME: uncomment the following lines when tests are stable and
# reliable
# if very_unstable_queries > 5:
# error_tests += very_unstable_queries
# status = "failure"
#
# error_tests += slow_average_tests
# FIXME: until here
error_tests += slow_average_tests
if error_tests:
status = "failure"
message_array.insert(0, str(error_tests) + " errors")

View File

@ -131,6 +131,23 @@ ls -la /
clickhouse-client -q "system flush logs" ||:
# Stop server so we can safely read data with clickhouse-local.
# Why do we read data with clickhouse-local?
# Because it's the simplest way to read it when server has crashed.
if [ "$NUM_TRIES" -gt "1" ]; then
clickhouse-client -q "system shutdown" ||:
sleep 10
else
sudo clickhouse stop ||:
fi
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
clickhouse-client --port 19000 -q "system shutdown" ||:
clickhouse-client --port 29000 -q "system shutdown" ||:
sleep 10
fi
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
@ -143,18 +160,17 @@ pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhous
# for files >64MB, we want this files to be compressed explicitly
for table in query_log zookeeper_log trace_log transactions_info_log
do
clickhouse-client -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.tsv.gz &
clickhouse-local --path /var/lib/clickhouse/ -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.tsv.gz ||:
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
clickhouse-client --port 19000 -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.1.tsv.gz &
clickhouse-client --port 29000 -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.2.tsv.gz &
clickhouse-local --path /var/lib/clickhouse1/ -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.1.tsv.gz ||:
clickhouse-local --path /var/lib/clickhouse2/ -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.2.tsv.gz ||:
fi
done
wait ||:
# Also export trace log in flamegraph-friendly format.
for trace_type in CPU Memory Real
do
clickhouse-client -q "
clickhouse-local --path /var/lib/clickhouse/ -q "
select
arrayStringConcat((arrayMap(x -> concat(splitByChar('/', addressToLine(x))[-1], '#', demangle(addressToSymbol(x)) ), trace)), ';') AS stack,
count(*) AS samples
@ -164,10 +180,9 @@ do
order by samples desc
settings allow_introspection_functions = 1
format TabSeparated" \
| pigz > "/test_output/trace-log-$trace_type-flamegraph.tsv.gz" &
| pigz > "/test_output/trace-log-$trace_type-flamegraph.tsv.gz" ||:
done
wait ||:
# Compressed (FIXME: remove once only github actions will be left)
rm /var/log/clickhouse-server/clickhouse-server.log

View File

@ -167,6 +167,34 @@ Config is read from multiple files (in XML or YAML format) and merged into singl
For queries and subsystems other than `Server` config is accessible using `Context::getConfigRef()` method. Every subsystem that is capable of reloading it's config without server restart should register itself in reload callback in `Server::main()` method. Note that if newer config has an error, most subsystems will ignore new config, log warning messages and keep working with previously loaded config. Due to the nature of `AbstractConfiguration` it is not possible to pass reference to specific section, so `String config_prefix` is usually used instead.
## Threads and jobs {#threads-and-jobs}
To execute queries and do side activities ClickHouse allocates threads from one of thread pools to avoid frequent thread creation and destruction. There are a few thread pools, which are selected depending on a purpose and structure of a job:
* Server pool for incoming client sessions.
* Global thread pool for general purpose jobs, background activities and standalone threads.
* IO thread pool for jobs that are mostly blocked on some IO and are not CPU-intensive.
* Background pools for periodic tasks.
* Pools for preemptable tasks that can be split into steps.
Server pool is a `Poco::ThreadPool` class instance defined in `Server::main()` method. It can have at most `max_connection` threads. Every thread is dedicated to a single active connection.
Global thread pool is `GlobalThreadPool` singleton class. To allocate thread from it `ThreadFromGlobalPool` is used. It has an interface similar to `std::thread`, but pulls thread from the global pool and does all necessary initializations. It is configured with the following settings:
* `max_thread_pool_size` - limit on thread count in pool.
* `max_thread_pool_free_size` - limit on idle thread count waiting for new jobs.
* `thread_pool_queue_size` - limit on scheduled job count.
Global pool is universal and all pools described below are implemented on top of it. This can be thought of as a hierarchy of pools. Any specialized pool takes its threads from the global pool using `ThreadPool` class. So the main purpose of any specialized pool is to apply limit on the number of simultaneous jobs and do job scheduling. If there are more jobs scheduled than threads in a pool, `ThreadPool` accumulates jobs in a queue with priorities. Each job has an integer priority. Default priority is zero. All jobs with higher priority values are started before any job with lower priority value. But there is no difference between already executing jobs, thus priority matters only when the pool in overloaded.
IO thread pool is implemented as a plain `ThreadPool` accessible via `IOThreadPool::get()` method. It is configured in the same way as global pool with `max_io_thread_pool_size`, `max_io_thread_pool_free_size` and `io_thread_pool_queue_size` settings. The main purpose of IO thread pool is to avoid exhaustion of the global pool with IO jobs, which could prevent queries from fully utilizing CPU.
For periodic task execution there is `BackgroundSchedulePool` class. You can register tasks using `BackgroundSchedulePool::TaskHolder` objects and the pool ensures that no task runs two jobs at the same time. It also allows you to postpone task execution to a specific instant in the future or temporarily deactivate task. Global `Context` provides a few instances of this class for different purposes. For general purpose tasks `Context::getSchedulePool()` is used.
There are also specialized thread pools for preemptable tasks. Such `IExecutableTask` task can be split into ordered sequence of jobs, called steps. To schedule these tasks in a manner allowing short tasks to be prioritied over long ones `MergeTreeBackgroundExecutor` is used. As name suggests it is used for background MergeTree related operations such as merges, mutations, fetches and moves. Pool instances are available using `Context::getCommonExecutor()` and other similar methods.
No matter what pool is used for a job, at start `ThreadStatus` instance is created for this job. It encapsulates all per-thread information: thread id, query id, performance counters, resource consumption and many other useful data. Job can access it via thread local pointer by `CurrentThread::get()` call, so we do not need to pass it to every function.
If thread is related to query execution, then the most important thing attached to `ThreadStatus` is query context `ContextPtr`. Every query has its master thread in the server pool. Master thread does the attachment by holding an `ThreadStatus::QueryScope query_scope(query_context)` object. Master thread also creates a thread group represented with `ThreadGroupStatus` object. Every additional thread that is allocated during this query execution is attached to its thread group by `CurrentThread::attachTo(thread_group)` call. Thread groups are used to aggregate profile event counters and track memory consumption by all threads dedicated to a single task (see `MemoryTracker` and `ProfileEvents::Counters` classes for more information).
## Distributed Query Execution {#distributed-query-execution}
Servers in a cluster setup are mostly independent. You can create a `Distributed` table on one or all servers in a cluster. The `Distributed` table does not store data itself it only provides a “view” to all local tables on multiple nodes of a cluster. When you SELECT from a `Distributed` table, it rewrites that query, chooses remote nodes according to load balancing settings, and sends the query to them. The `Distributed` table requests remote servers to process a query just up to a stage where intermediate results from different servers can be merged. Then it receives the intermediate results and merges them. The distributed table tries to distribute as much work as possible to remote servers and does not send much intermediate data over the network.

View File

@ -9,9 +9,8 @@ ClickHouse can accept and return data in various formats. A format supported for
results of a `SELECT`, and to perform `INSERT`s into a file-backed table.
The supported formats are:
| Format | Input | Output |
|-----------------------------------------------------------------------------------------|-------|--------|
| Input | Output |
|-------------------------------------------------------------------------------------------|-------|-------|
| [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
@ -69,6 +68,8 @@ The supported formats are:
| [Regexp](#data-format-regexp) | ✔ | ✗ |
| [RawBLOB](#rawblob) | ✔ | ✔ |
| [MsgPack](#msgpack) | ✔ | ✔ |
| [MySQLDump](#mysqldump) | ✔ | ✗ |
You can control some format processing parameters with the ClickHouse settings. For more information read the [Settings](../operations/settings/settings.md) section.
@ -184,7 +185,7 @@ Differs from the `TabSeparated` format in that the column names are written in t
During parsing, the first row is expected to contain the column names. You can use column names to determine their position and to check their correctness.
If setting [input_format_with_names_use_header](../operations/settings/settings.md#settings-input_format_with_names_use_header) is set to 1,
the columns from input data will be mapped to the columns from the table by their names, columns with unknown names will be skipped if setting [input_format_skip_unknown_fields](../operations/settings/settings.md#settings-input_format_skip_unknown_fields) is set to 1.
the columns from input data will be mapped to the columns from the table by their names, columns with unknown names will be skipped if setting [input_format_skip_unknown_fields](../operations/settings/settings.md#settings-input-format-skip-unknown-fields) is set to 1.
Otherwise, the first row will be skipped.
This format is also available under the name `TSVWithNames`.
@ -1776,3 +1777,70 @@ $ clickhouse-client --query="CREATE TABLE msgpack (array Array(UInt8)) ENGINE =
$ clickhouse-client --query="INSERT INTO msgpack VALUES ([0, 1, 2, 3, 42, 253, 254, 255]), ([255, 254, 253, 42, 3, 2, 1, 0])";
$ clickhouse-client --query="SELECT * FROM msgpack FORMAT MsgPack" > tmp_msgpack.msgpk;
```
## MySQLDump {#msgpack}
ClickHouse supports reading MySQL [dumps](https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html).
It reads all data from INSERT queries belonging to one table in dump. If there are more than one table, by default it reads data from the first one.
You can specify the name of the table from which to read data from using [input_format_mysql_dump_table_name](../operations/settings/settings.md#settings-input-format-mysql-dump-table-name) settings.
If setting [input_format_mysql_dump_map_columns](../operations/settings/settings.md#settings-input-format-mysql-dump-map-columns) is set to 1 and
dump contains CREATE query for specified table or column names in INSERT query the columns from input data will be mapped to the columns from the table by their names,
columns with unknown names will be skipped if setting [input_format_skip_unknown_fields](../operations/settings/settings.md#settings-input-format-skip-unknown-fields) is set to 1.
This format supports schema inference: if the dump contains CREATE query for the specified table, the structure is extracted from it, otherwise schema is inferred from the data of INSERT queries.
Examples:
File dump.sql:
```sql
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `test` (
`x` int DEFAULT NULL,
`y` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
INSERT INTO `test` VALUES (1,NULL),(2,NULL),(3,NULL),(3,NULL),(4,NULL),(5,NULL),(6,7);
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `test 3` (
`y` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
INSERT INTO `test 3` VALUES (1);
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `test2` (
`x` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
INSERT INTO `test2` VALUES (1),(2),(3);
```
Queries:
```sql
:) desc file(dump.sql, MySQLDump) settings input_format_mysql_dump_table_name='test2'
DESCRIBE TABLE file(dump.sql, MySQLDump)
SETTINGS input_format_mysql_dump_table_name = 'test2'
Query id: 25e66c89-e10a-42a8-9b42-1ee8bbbde5ef
┌─name─┬─type────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ x │ Nullable(Int32) │ │ │ │ │ │
└──────┴─────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘
:) select * from file(dump.sql, MySQLDump) settings input_format_mysql_dump_table_name='test2'
SELECT *
FROM file(dump.sql, MySQLDump)
SETTINGS input_format_mysql_dump_table_name = 'test2'
Query id: 17d59664-ebce-4053-bb79-d46a516fb590
┌─x─┐
│ 1 │
│ 2 │
│ 3 │
└───┘
```

View File

@ -267,7 +267,7 @@ See also “[Executable User Defined Functions](../../sql-reference/functions/in
**Example**
``` xml
<user_defined_executable_functions_config>*_dictionary.xml</user_defined_executable_functions_config>
<user_defined_executable_functions_config>*_function.xml</user_defined_executable_functions_config>
```
## dictionaries_lazy_load {#server_configuration_parameters-dictionaries_lazy_load}

View File

@ -6,6 +6,29 @@ slug: /en/operations/settings/settings
# Settings {#settings}
## allow_nondeterministic_mutations {#allow_nondeterministic_mutations}
User-level setting that allows mutations on replicated tables to make use of non-deterministic functions such as `dictGet`.
Given that, for example, dictionaries, can be out of sync across nodes, mutations that pull values from them are disallowed on replicated tables by default. Enabling this setting allows this behavior, making it the user's responsibility to ensure that the data used is in sync across all nodes.
Default value: 0.
**Example**
``` xml
<profiles>
<default>
<allow_nondeterministic_mutations>1</allow_nondeterministic_mutations>
<!-- ... -->
</default>
<!-- ... -->
</profiles>
```
## distributed_product_mode {#distributed-product-mode}
Changes the behaviour of [distributed subqueries](../../sql-reference/operators/in.md).
@ -4225,3 +4248,18 @@ Default value: 0.
The waiting time in seconds for currently handled connections when shutdown server.
Default Value: 5.
## input_format_mysql_dump_table_name (#input-format-mysql-dump-table-name)
The name of the table from which to read data from in MySQLDump input format.
## input_format_mysql_dump_map_columns (#input-format-mysql-dump-map-columns)
Enables matching columns from table in MySQL dump and columns from ClickHouse table by names in MySQLDump input format.
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 1.

View File

@ -84,6 +84,7 @@ Result:
Returns the inclusive lower bound of the corresponding tumbling window.
``` sql
tumbleStart(bounds_tuple);
tumbleStart(time_attr, interval [, timezone]);
```
@ -92,6 +93,7 @@ tumbleStart(time_attr, interval [, timezone]);
Returns the exclusive upper bound of the corresponding tumbling window.
``` sql
tumbleEnd(bounds_tuple);
tumbleEnd(time_attr, interval [, timezone]);
```
@ -100,6 +102,7 @@ tumbleEnd(time_attr, interval [, timezone]);
Returns the inclusive lower bound of the corresponding hopping window.
``` sql
hopStart(bounds_tuple);
hopStart(time_attr, hop_interval, window_interval [, timezone]);
```
@ -108,5 +111,6 @@ hopStart(time_attr, hop_interval, window_interval [, timezone]);
Returns the exclusive upper bound of the corresponding hopping window.
``` sql
hopEnd(bounds_tuple);
hopEnd(time_attr, hop_interval, window_interval [, timezone]);
```

View File

@ -263,7 +263,7 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
**Пример**
``` xml
<user_defined_executable_functions_config>*_dictionary.xml</user_defined_executable_functions_config>
<user_defined_executable_functions_config>*_function.xml</user_defined_executable_functions_config>
```
## dictionaries_lazy_load {#server_configuration_parameters-dictionaries_lazy_load}

View File

@ -5,7 +5,6 @@
#include <Interpreters/ClientInfo.h>
#include <Core/UUID.h>
#include <base/scope_guard.h>
#include <base/shared_ptr_helper.h>
#include <boost/container/flat_set.hpp>
#include <mutex>
#include <optional>

View File

@ -1146,7 +1146,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
ConstraintsDescription{},
String{},
};
StoragePtr storage = StorageFile::create(in_file, global_context->getUserFilesPath(), args);
StoragePtr storage = std::make_shared<StorageFile>(in_file, global_context->getUserFilesPath(), args);
storage->startup();
SelectQueryInfo query_info;

View File

@ -53,7 +53,7 @@ ColumnSparse::ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offs
#ifndef NDEBUG
const auto & offsets_data = getOffsetsData();
const auto * it = std::adjacent_find(offsets_data.begin(), offsets_data.end(), std::greater_equal<UInt64>());
const auto * it = std::adjacent_find(offsets_data.begin(), offsets_data.end(), std::greater_equal<>());
if (it != offsets_data.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Offsets of ColumnSparse must be strictly sorted");
#endif

View File

@ -89,6 +89,9 @@
M(KafkaConsumersInUse, "Number of consumers which are currently used by direct or background reads") \
M(KafkaWrites, "Number of currently running inserts to Kafka") \
M(KafkaAssignedPartitions, "Number of partitions Kafka tables currently assigned to") \
M(FilesystemCacheReadBuffers, "Number of active cache buffers") \
M(CacheFileSegments, "Number of existing cache file segments") \
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
namespace CurrentMetrics
{

View File

@ -276,7 +276,7 @@ static void getNotEnoughMemoryMessage(std::string & msg)
#endif
}
static std::string getExtraExceptionInfo(const std::exception & e)
std::string getExtraExceptionInfo(const std::exception & e)
{
String msg;
try

View File

@ -7,6 +7,7 @@
#include <Poco/Version.h>
#include <Poco/Exception.h>
#include <base/defines.h>
#include <Common/StackTrace.h>
#include <fmt/format.h>
@ -177,6 +178,8 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
int getCurrentExceptionCode();
int getExceptionErrorCode(std::exception_ptr e);
/// Returns string containing extra diagnostic info for specific exceptions (like "no space left on device" and "memory limit exceeded")
std::string getExtraExceptionInfo(const std::exception & e);
/// An execution status of any piece of code, contains return code and optional error
struct ExecutionStatus

View File

@ -284,7 +284,10 @@ void LRUFileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(current_pos, hole_size, key, this, FileSegment::State::EMPTY);
file_segment->detached = true;
{
std::lock_guard segment_lock(file_segment->mutex);
file_segment->markAsDetached(segment_lock);
}
file_segments.insert(it, file_segment);
}
else
@ -308,7 +311,10 @@ void LRUFileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(current_pos, hole_size, key, this, FileSegment::State::EMPTY);
file_segment->detached = true;
{
std::lock_guard segment_lock(file_segment->mutex);
file_segment->markAsDetached(segment_lock);
}
file_segments.insert(file_segments.end(), file_segment);
}
else
@ -364,7 +370,10 @@ FileSegmentsHolder LRUFileCache::get(const Key & key, size_t offset, size_t size
if (file_segments.empty())
{
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, FileSegment::State::EMPTY);
file_segment->detached = true;
{
std::lock_guard segment_lock(file_segment->mutex);
file_segment->markAsDetached(segment_lock);
}
file_segments = { file_segment };
}
else

View File

@ -6,6 +6,11 @@
#include <IO/Operators.h>
#include <filesystem>
namespace CurrentMetrics
{
extern const Metric CacheDetachedFileSegments;
}
namespace DB
{
@ -570,7 +575,7 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
}
detached = true;
markAsDetached(segment_lock);
if (cache_writer)
{
@ -685,7 +690,7 @@ void FileSegment::detach(std::lock_guard<std::mutex> & cache_lock, std::lock_gua
if (detached)
return;
detached = true;
markAsDetached(segment_lock);
if (!hasFinalizedState())
{
@ -693,6 +698,19 @@ void FileSegment::detach(std::lock_guard<std::mutex> & cache_lock, std::lock_gua
}
}
void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */)
{
detached = true;
CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
}
FileSegment::~FileSegment()
{
std::lock_guard segment_lock(mutex);
if (detached)
CurrentMetrics::sub(CurrentMetrics::CacheDetachedFileSegments);
}
FileSegmentsHolder::~FileSegmentsHolder()
{
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from

View File

@ -8,6 +8,11 @@
namespace Poco { class Logger; }
namespace CurrentMetrics
{
extern const Metric CacheFileSegments;
}
namespace DB
{
@ -66,6 +71,8 @@ public:
size_t offset_, size_t size_, const Key & key_,
IFileCache * cache_, State download_state_);
~FileSegment();
State state() const;
static String stateToString(FileSegment::State state);
@ -156,7 +163,7 @@ private:
void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
bool hasFinalizedState() const;
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return detached; }
void markAsDetached(std::lock_guard<std::mutex> & segment_lock);
void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
@ -215,6 +222,8 @@ private:
std::atomic<bool> is_downloaded{false};
std::atomic<size_t> hits_count = 0; /// cache hits.
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state
CurrentMetrics::Increment metric_increment{CurrentMetrics::CacheFileSegments};
};
struct FileSegmentsHolder : private boost::noncopyable

View File

@ -190,9 +190,8 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
bool need_to_throw = true;
bool try_to_free_memory = overcommit_tracker != nullptr && query_tracker != nullptr;
if (try_to_free_memory)
need_to_throw = overcommit_tracker->needToStopQuery(query_tracker);
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr != nullptr && query_tracker != nullptr)
need_to_throw = overcommit_tracker_ptr->needToStopQuery(query_tracker, size);
if (need_to_throw)
{
@ -211,6 +210,9 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
}
else
{
// If OvercommitTracker::needToStopQuery returned false, it guarantees that enough memory is freed.
// This memory is already counted in variable `amount` in the moment of `will_be` initialization.
// Now we just need to update value stored in `will_be`, because it should have changed.
will_be = amount.load(std::memory_order_relaxed);
}
}
@ -308,6 +310,8 @@ void MemoryTracker::free(Int64 size)
accounted_size += new_amount;
}
}
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr)
overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size);
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);

View File

@ -73,7 +73,7 @@ private:
/// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic<const char *> description_ptr = nullptr;
OvercommitTracker * overcommit_tracker = nullptr;
std::atomic<OvercommitTracker *> overcommit_tracker = nullptr;
bool updatePeak(Int64 will_be, bool log_memory_usage);
void logMemoryUsage(Int64 current) const;
@ -188,13 +188,18 @@ public:
void setOvercommitTracker(OvercommitTracker * tracker) noexcept
{
overcommit_tracker = tracker;
overcommit_tracker.store(tracker, std::memory_order_relaxed);
}
void resetOvercommitTracker() noexcept
{
overcommit_tracker.store(nullptr, std::memory_order_relaxed);
}
/// Reset the accumulated data
void resetCounters();
/// Reset the accumulated data and the parent.
/// Reset the accumulated data.
void reset();
/// Reset current counter to a new value.

View File

@ -11,8 +11,11 @@ constexpr std::chrono::microseconds ZERO_MICROSEC = 0us;
OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_)
: max_wait_time(ZERO_MICROSEC)
, picked_tracker(nullptr)
, cancelation_state(QueryCancelationState::NONE)
, cancellation_state(QueryCancellationState::NONE)
, global_mutex(global_mutex_)
, freed_memory(0)
, required_memory(0)
, allow_release(true)
{}
void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
@ -21,12 +24,12 @@ void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
max_wait_time = wait_time * 1us;
}
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
{
// NOTE: Do not change the order of locks
//
// global_mutex must be acquired before overcommit_m, because
// method OvercommitTracker::unsubscribe(MemoryTracker *) is
// method OvercommitTracker::onQueryStop(MemoryTracker *) is
// always called with already acquired global_mutex in
// ProcessListEntry::~ProcessListEntry().
std::unique_lock<std::mutex> global_lock(global_mutex);
@ -36,42 +39,86 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
return true;
pickQueryToExclude();
assert(cancelation_state == QueryCancelationState::RUNNING);
assert(cancellation_state != QueryCancellationState::NONE);
global_lock.unlock();
// If no query was chosen we need to stop current query.
// This may happen if no soft limit is set.
if (picked_tracker == nullptr)
{
cancelation_state = QueryCancelationState::NONE;
// Here state can not be RUNNING, because it requires
// picked_tracker to be not null pointer.
assert(cancellation_state == QueryCancellationState::SELECTED);
cancellation_state = QueryCancellationState::NONE;
return true;
}
if (picked_tracker == tracker)
return true;
bool timeout = !cv.wait_for(lk, max_wait_time, [this]()
{
return cancelation_state == QueryCancelationState::NONE;
// Query of the provided as an argument memory tracker was chosen.
// It may happen even when current state is RUNNING, because
// ThreadStatus::~ThreadStatus may call MemoryTracker::alloc.
cancellation_state = QueryCancellationState::RUNNING;
return true;
}
allow_release = true;
required_memory += amount;
required_per_thread[tracker] = amount;
bool timeout = !cv.wait_for(lk, max_wait_time, [this, tracker]()
{
return required_per_thread[tracker] == 0 || cancellation_state == QueryCancellationState::NONE;
});
if (timeout)
LOG_DEBUG(getLogger(), "Need to stop query because reached waiting timeout");
else
LOG_DEBUG(getLogger(), "Memory freed within timeout");
return timeout;
LOG_DEBUG(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : ""));
required_memory -= amount;
Int64 still_need = required_per_thread[tracker]; // If enough memory is freed it will be 0
required_per_thread.erase(tracker);
// If threads where not released since last call of this method,
// we can release them now.
if (allow_release && required_memory <= freed_memory && still_need != 0)
releaseThreads();
// All required amount of memory is free now and selected query to stop doesn't know about it.
// As we don't need to free memory, we can continue execution of the selected query.
if (required_memory == 0 && cancellation_state == QueryCancellationState::SELECTED)
reset();
return timeout || still_need != 0;
}
void OvercommitTracker::unsubscribe(MemoryTracker * tracker)
void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount)
{
std::lock_guard guard(overcommit_m);
if (cancellation_state != QueryCancellationState::NONE)
{
freed_memory += amount;
if (freed_memory >= required_memory)
releaseThreads();
}
}
void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
{
std::unique_lock<std::mutex> lk(overcommit_m);
if (picked_tracker == tracker)
{
LOG_DEBUG(getLogger(), "Picked query stopped");
picked_tracker = nullptr;
cancelation_state = QueryCancelationState::NONE;
reset();
cv.notify_all();
}
}
void OvercommitTracker::releaseThreads()
{
for (auto & required : required_per_thread)
required.second = 0;
freed_memory = 0;
allow_release = false; // To avoid repeating call of this method in OvercommitTracker::needToStopQuery
cv.notify_all();
}
UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list, DB::ProcessListForUser * user_process_list_)
: OvercommitTracker(process_list->mutex)
, user_process_list(user_process_list_)
@ -122,17 +169,17 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
for (auto const & query : process_list->processes)
{
if (query.isKilled())
return;
continue;
Int64 user_soft_limit = 0;
if (auto const * user_process_list = query.getUserProcessList())
user_soft_limit = user_process_list->user_memory_tracker.getSoftLimit();
if (user_soft_limit == 0)
return;
continue;
auto * memory_tracker = query.getMemoryTracker();
if (!memory_tracker)
return;
continue;
auto ratio = memory_tracker->getOvercommitRatio(user_soft_limit);
LOG_DEBUG(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit);
if (current_ratio < ratio)

View File

@ -34,6 +34,13 @@ struct OvercommitRatio
class MemoryTracker;
enum class QueryCancellationState
{
NONE = 0, // Hard limit is not reached, there is no selected query to kill.
SELECTED = 1, // Hard limit is reached, query to stop was chosen but it still is not aware of cancellation.
RUNNING = 2, // Hard limit is reached, selected query has started the process of cancellation.
};
// Usually it's hard to set some reasonable hard memory limit
// (especially, the default value). This class introduces new
// mechanisim for the limiting of memory usage.
@ -45,9 +52,11 @@ struct OvercommitTracker : boost::noncopyable
{
void setMaxWaitTime(UInt64 wait_time);
bool needToStopQuery(MemoryTracker * tracker);
bool needToStopQuery(MemoryTracker * tracker, Int64 amount);
void unsubscribe(MemoryTracker * tracker);
void tryContinueQueryExecutionAfterFree(Int64 amount);
void onQueryStop(MemoryTracker * tracker);
virtual ~OvercommitTracker() = default;
@ -58,23 +67,16 @@ protected:
// This mutex is used to disallow concurrent access
// to picked_tracker and cancelation_state variables.
mutable std::mutex overcommit_m;
mutable std::condition_variable cv;
std::mutex overcommit_m;
std::condition_variable cv;
std::chrono::microseconds max_wait_time;
enum class QueryCancelationState
{
NONE,
RUNNING,
};
// Specifies memory tracker of the chosen to stop query.
// If soft limit is not set, all the queries which reach hard limit must stop.
// This case is represented as picked tracker pointer is set to nullptr and
// overcommit tracker is in RUNNING state.
// overcommit tracker is in SELECTED state.
MemoryTracker * picked_tracker;
QueryCancelationState cancelation_state;
virtual Poco::Logger * getLogger() = 0;
@ -82,19 +84,37 @@ private:
void pickQueryToExclude()
{
if (cancelation_state != QueryCancelationState::RUNNING)
if (cancellation_state == QueryCancellationState::NONE)
{
pickQueryToExcludeImpl();
cancelation_state = QueryCancelationState::RUNNING;
cancellation_state = QueryCancellationState::SELECTED;
}
}
void reset() noexcept
{
picked_tracker = nullptr;
cancellation_state = QueryCancellationState::NONE;
freed_memory = 0;
allow_release = true;
}
void releaseThreads();
QueryCancellationState cancellation_state;
std::unordered_map<MemoryTracker *, Int64> required_per_thread;
// Global mutex which is used in ProcessList to synchronize
// insertion and deletion of queries.
// OvercommitTracker::pickQueryToExcludeImpl() implementations
// require this mutex to be locked, because they read list (or sublist)
// of queries.
std::mutex & global_mutex;
Int64 freed_memory;
Int64 required_memory;
bool allow_release;
};
namespace DB
@ -110,7 +130,7 @@ struct UserOvercommitTracker : OvercommitTracker
~UserOvercommitTracker() override = default;
protected:
void pickQueryToExcludeImpl() override final;
void pickQueryToExcludeImpl() override;
Poco::Logger * getLogger() override final { return logger; }
private:
@ -125,7 +145,7 @@ struct GlobalOvercommitTracker : OvercommitTracker
~GlobalOvercommitTracker() override = default;
protected:
void pickQueryToExcludeImpl() override final;
void pickQueryToExcludeImpl() override;
Poco::Logger * getLogger() override final { return logger; }
private:

View File

@ -16,6 +16,7 @@
M(QueryTimeMicroseconds, "Total time of all queries.") \
M(SelectQueryTimeMicroseconds, "Total time of SELECT queries.") \
M(InsertQueryTimeMicroseconds, "Total time of INSERT queries.") \
M(OtherQueryTimeMicroseconds, "Total time of queries that are not SELECT or INSERT.") \
M(FileOpen, "Number of files opened.") \
M(Seek, "Number of times the 'lseek' function was called.") \
M(ReadBufferFromFileDescriptorRead, "Number of reads (read/pread) from a file descriptor. Does not include sockets.") \
@ -240,18 +241,23 @@
M(NotCreatedLogEntryForMutation, "Log entry to mutate parts in ReplicatedMergeTree is not created due to concurrent log update by another replica.") \
\
M(S3ReadMicroseconds, "Time of GET and HEAD requests to S3 storage.") \
M(S3ReadBytes, "Read bytes (incoming) in GET and HEAD requests to S3 storage.") \
M(S3ReadRequestsCount, "Number of GET and HEAD requests to S3 storage.") \
M(S3ReadRequestsErrors, "Number of non-throttling errors in GET and HEAD requests to S3 storage.") \
M(S3ReadRequestsThrottling, "Number of 429 and 503 errors in GET and HEAD requests to S3 storage.") \
M(S3ReadRequestsRedirects, "Number of redirects in GET and HEAD requests to S3 storage.") \
\
M(S3WriteMicroseconds, "Time of POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(S3WriteBytes, "Write bytes (outgoing) in POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(S3WriteRequestsCount, "Number of POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(S3WriteRequestsErrors, "Number of non-throttling errors in POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(S3WriteRequestsThrottling, "Number of 429 and 503 errors in POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(S3WriteRequestsRedirects, "Number of redirects in POST, DELETE, PUT and PATCH requests to S3 storage.") \
\
M(ReadBufferFromS3Microseconds, "Time spend in reading from S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \
\
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \
\
M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \
\
M(RemoteFSReadMicroseconds, "Time of reading from remote filesystem.") \

View File

@ -27,7 +27,6 @@
namespace ProfileEvents
{
#if defined(__linux__)
extern const Event OSIOWaitMicroseconds;
extern const Event OSCPUWaitMicroseconds;
extern const Event OSCPUVirtualTimeMicroseconds;
@ -61,7 +60,6 @@ namespace ProfileEvents
extern const Event PerfInstructionTLBMisses;
extern const Event PerfLocalMemoryReferences;
extern const Event PerfLocalMemoryMisses;
#endif
}
namespace DB

View File

@ -0,0 +1,442 @@
#include <gtest/gtest.h>
#include <chrono>
#include <thread>
#include <vector>
#include <Common/MemoryTracker.h>
#include <Common/OvercommitTracker.h>
#include <Interpreters/ProcessList.h>
using namespace std::chrono_literals;
using namespace DB;
template <typename BaseTracker>
struct OvercommitTrackerForTest : BaseTracker
{
template <typename ...Ts>
explicit OvercommitTrackerForTest(Ts && ...args)
: BaseTracker(std::move(args)...)
{}
void setCandidate(MemoryTracker * candidate)
{
tracker = candidate;
}
protected:
void pickQueryToExcludeImpl() override
{
BaseTracker::picked_tracker = tracker;
}
MemoryTracker * tracker;
};
using UserOvercommitTrackerForTest = OvercommitTrackerForTest<UserOvercommitTracker>;
using GlobalOvercommitTrackerForTest = OvercommitTrackerForTest<GlobalOvercommitTracker>;
static constexpr UInt64 WAIT_TIME = 4'000'000;
template <typename T>
void free_not_continue_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
{
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
}
std::thread(
[&]()
{
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(50);
}
).join();
for (auto & thread : threads)
{
thread.join();
}
ASSERT_EQ(need_to_stop, THREADS);
}
TEST(OvercommitTracker, UserFreeNotContinue)
{
ProcessList process_list;
ProcessListForUser user_process_list(&process_list);
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
free_not_continue_test(user_overcommit_tracker);
}
TEST(OvercommitTracker, GlobalFreeNotContinue)
{
ProcessList process_list;
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
free_not_continue_test(global_overcommit_tracker);
}
template <typename T>
void free_continue_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
{
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
}
std::thread(
[&]()
{
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
}
).join();
for (auto & thread : threads)
{
thread.join();
}
ASSERT_EQ(need_to_stop, 0);
}
TEST(OvercommitTracker, UserFreeContinue)
{
ProcessList process_list;
ProcessListForUser user_process_list(&process_list);
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
free_continue_test(user_overcommit_tracker);
}
TEST(OvercommitTracker, GlobalFreeContinue)
{
ProcessList process_list;
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
free_continue_test(global_overcommit_tracker);
}
template <typename T>
void free_continue_and_alloc_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
{
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
}
bool stopped_next = false;
std::thread(
[&]()
{
MemoryTracker failed;
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
}
).join();
for (auto & thread : threads)
{
thread.join();
}
ASSERT_EQ(need_to_stop, 0);
ASSERT_EQ(stopped_next, true);
}
TEST(OvercommitTracker, UserFreeContinueAndAlloc)
{
ProcessList process_list;
ProcessListForUser user_process_list(&process_list);
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
free_continue_and_alloc_test(user_overcommit_tracker);
}
TEST(OvercommitTracker, GlobalFreeContinueAndAlloc)
{
ProcessList process_list;
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
free_continue_and_alloc_test(global_overcommit_tracker);
}
template <typename T>
void free_continue_and_alloc_2_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
{
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
}
bool stopped_next = false;
threads.push_back(std::thread(
[&]()
{
MemoryTracker failed;
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
}
));
threads.push_back(std::thread(
[&]()
{
std::this_thread::sleep_for(2000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(90);
}
));
for (auto & thread : threads)
{
thread.join();
}
ASSERT_EQ(need_to_stop, 0);
ASSERT_EQ(stopped_next, true);
}
TEST(OvercommitTracker, UserFreeContinueAndAlloc2)
{
ProcessList process_list;
ProcessListForUser user_process_list(&process_list);
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
free_continue_and_alloc_2_test(user_overcommit_tracker);
}
TEST(OvercommitTracker, GlobalFreeContinueAndAlloc2)
{
ProcessList process_list;
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
free_continue_and_alloc_2_test(global_overcommit_tracker);
}
template <typename T>
void free_continue_and_alloc_3_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
{
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
}
bool stopped_next = false;
threads.push_back(std::thread(
[&]()
{
MemoryTracker failed;
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
}
));
threads.push_back(std::thread(
[&]()
{
std::this_thread::sleep_for(2000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(100);
}
));
for (auto & thread : threads)
{
thread.join();
}
ASSERT_EQ(need_to_stop, 0);
ASSERT_EQ(stopped_next, false);
}
TEST(OvercommitTracker, UserFreeContinueAndAlloc3)
{
ProcessList process_list;
ProcessListForUser user_process_list(&process_list);
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
free_continue_and_alloc_3_test(user_overcommit_tracker);
}
TEST(OvercommitTracker, GlobalFreeContinueAndAlloc3)
{
ProcessList process_list;
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
free_continue_and_alloc_3_test(global_overcommit_tracker);
}
template <typename T>
void free_continue_2_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
{
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
}
std::thread(
[&]()
{
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(300);
}
).join();
for (auto & thread : threads)
{
thread.join();
}
ASSERT_EQ(need_to_stop, 2);
}
TEST(OvercommitTracker, UserFreeContinue2)
{
ProcessList process_list;
ProcessListForUser user_process_list(&process_list);
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
free_continue_2_test(user_overcommit_tracker);
}
TEST(OvercommitTracker, GlobalFreeContinue2)
{
ProcessList process_list;
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
free_continue_2_test(global_overcommit_tracker);
}
template <typename T>
void query_stop_not_continue_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
MemoryTracker picked;
overcommit_tracker.setCandidate(&picked);
MemoryTracker another;
auto thread = std::thread(
[&]()
{
if (overcommit_tracker.needToStopQuery(&another, 100))
++need_to_stop;
}
);
std::this_thread::sleep_for(1000ms);
overcommit_tracker.onQueryStop(&picked);
thread.join();
ASSERT_EQ(need_to_stop, 1);
}
TEST(OvercommitTracker, UserQueryStopNotContinue)
{
ProcessList process_list;
ProcessListForUser user_process_list(&process_list);
UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list);
query_stop_not_continue_test(user_overcommit_tracker);
}
TEST(OvercommitTracker, GlobalQueryStopNotContinue)
{
ProcessList process_list;
GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list);
query_stop_not_continue_test(global_overcommit_tracker);
}

View File

@ -6,7 +6,6 @@
#include <memory>
#include <string>
#include <base/shared_ptr_helper.h>
#include <Common/Exception.h>
#include <base/demangle.h>
@ -60,9 +59,26 @@ To typeid_cast(From * from)
}
}
namespace detail
{
template <typename T>
struct is_shared_ptr : std::false_type
{
};
template <typename T>
struct is_shared_ptr<std::shared_ptr<T>> : std::true_type
{
};
template <typename T>
inline constexpr bool is_shared_ptr_v = is_shared_ptr<T>::value;
}
template <typename To, typename From>
requires is_shared_ptr_v<To>
requires detail::is_shared_ptr_v<To>
To typeid_cast(const std::shared_ptr<From> & from)
{
try

View File

@ -84,7 +84,8 @@ public:
}
else if (compression_method == CompressionMethod::Zstd)
{
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(std::move(file_buf), /* compression level = */ 3, /* append_to_existing_stream = */ mode == WriteMode::Append);
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(
std::move(file_buf), /* compression level = */ 3, /* append_to_existing_file_ = */ mode == WriteMode::Append);
}
else
{
@ -140,7 +141,7 @@ private:
std::string filepath;
std::unique_ptr<WriteBufferFromFile> file_buf;
std::unique_ptr<WriteBufferWithOwnMemoryDecorator> compressed_buffer;
std::unique_ptr<ZstdDeflatingAppendableWriteBuffer> compressed_buffer;
uint64_t start_index;
};
@ -712,14 +713,21 @@ void Changelog::flush()
current_writer->flush(force_sync);
}
void Changelog::shutdown()
{
if (!log_files_to_delete_queue.isFinished())
log_files_to_delete_queue.finish();
if (clean_log_thread.joinable())
clean_log_thread.join();
}
Changelog::~Changelog()
{
try
{
flush();
log_files_to_delete_queue.finish();
if (clean_log_thread.joinable())
clean_log_thread.join();
shutdown();
}
catch (...)
{

View File

@ -121,6 +121,8 @@ public:
/// Fsync latest log to disk and flush buffer
void flush();
void shutdown();
uint64_t size() const
{
return logs.size();

View File

@ -118,4 +118,18 @@ nuraft::ptr<nuraft::log_entry> KeeperLogStore::getLatestConfigChange() const
return changelog.getLatestConfigChange();
}
void KeeperLogStore::shutdownChangelog()
{
std::lock_guard lock(changelog_lock);
changelog.shutdown();
}
bool KeeperLogStore::flushChangelogAndShutdown()
{
std::lock_guard lock(changelog_lock);
changelog.flush();
changelog.shutdown();
return true;
}
}

View File

@ -52,6 +52,12 @@ public:
/// Call fsync to the stored data
bool flush() override;
/// Stop background cleanup thread in change
void shutdownChangelog();
/// Flush logstore and call shutdown of background thread in changelog
bool flushChangelogAndShutdown();
/// Current log storage size
uint64_t size() const;

View File

@ -360,7 +360,7 @@ void KeeperServer::shutdownRaftServer()
void KeeperServer::shutdown()
{
state_machine->shutdownStorage();
state_manager->flushLogStore();
state_manager->flushAndShutDownLogStore();
shutdownRaftServer();
}

View File

@ -249,9 +249,9 @@ ClusterConfigPtr KeeperStateManager::getLatestConfigFromLogStore() const
return nullptr;
}
void KeeperStateManager::flushLogStore()
void KeeperStateManager::flushAndShutDownLogStore()
{
log_store->flush();
log_store->flushChangelogAndShutdown();
}
void KeeperStateManager::save_config(const nuraft::cluster_config & config)

View File

@ -52,7 +52,8 @@ public:
void loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep);
void flushLogStore();
/// Flush logstore and call shutdown of background thread
void flushAndShutDownLogStore();
/// Called on server start, in our case we don't use any separate logic for load
nuraft::ptr<nuraft::cluster_config> load_config() override

View File

@ -1604,6 +1604,111 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
}
TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
{
std::cerr << "================First time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 2);
}
{
std::cerr << "================Second time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 3);
}
{
std::cerr << "================Third time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 4);
}
{
std::cerr << "================Fourth time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 5);
}
}
TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
for (size_t i = 0; i < 36; ++i)
{
std::cerr << "================First time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
for (size_t j = 0; j < 7; ++j)
{
auto entry = getLogEntry("hello_world", 7);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
}
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
EXPECT_EQ(changelog.next_slot(), 36 * 7 + 1);
}
TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
std::cerr << "================First time=====================\n";
DB::KeeperLogStore changelog1("./logs", 100, true, params.enable_compression);
changelog1.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog1.append(entry);
changelog1.end_of_append_batch(0, 0);
EXPECT_EQ(changelog1.next_slot(), 2);
std::cerr << "================Second time=====================\n";
DB::KeeperLogStore changelog2("./logs", 100, true, params.enable_compression);
changelog2.init(1, 0);
entry = getLogEntry("hello_world", 1000);
changelog2.append(entry);
changelog2.end_of_append_batch(0, 0);
EXPECT_EQ(changelog2.next_slot(), 3);
std::cerr << "================Third time=====================\n";
DB::KeeperLogStore changelog3("./logs", 100, true, params.enable_compression);
changelog3.init(1, 0);
entry = getLogEntry("hello_world", 1000);
changelog3.append(entry);
changelog3.end_of_append_batch(0, 0);
EXPECT_EQ(changelog3.next_slot(), 4);
std::cerr << "================Fourth time=====================\n";
DB::KeeperLogStore changelog4("./logs", 100, true, params.enable_compression);
changelog4.init(1, 0);
entry = getLogEntry("hello_world", 1000);
changelog4.append(entry);
changelog4.end_of_append_batch(0, 0);
EXPECT_EQ(changelog4.next_slot(), 5);
}
TEST_P(CoordinationTest, TestLogGap)
{

View File

@ -612,6 +612,7 @@ DataTypes Block::getDataTypes() const
return res;
}
Names Block::getDataTypeNames() const
{
Names res;
@ -624,6 +625,12 @@ Names Block::getDataTypeNames() const
}
std::unordered_map<String, size_t> Block::getNamesToIndexesMap() const
{
return index_by_name;
}
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
return checkBlockStructure<bool>(lhs, rhs, "", false);

View File

@ -92,6 +92,7 @@ public:
Names getNames() const;
DataTypes getDataTypes() const;
Names getDataTypeNames() const;
std::unordered_map<String, size_t> getNamesToIndexesMap() const;
/// Returns number of rows from first column in block, not equal to nullptr. If no columns, returns 0.
size_t rows() const;

View File

@ -56,7 +56,7 @@ void MySQLClient::connect()
in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
out = std::make_shared<WriteBufferFromPocoSocket>(*socket);
packet_endpoint = MySQLProtocol::PacketEndpoint::create(*in, *out, sequence_id);
packet_endpoint = std::make_shared<MySQLProtocol::PacketEndpoint>(*in, *out, sequence_id);
handshake();
}

View File

@ -56,7 +56,7 @@ private:
std::shared_ptr<WriteBuffer> out;
std::unique_ptr<Poco::Net::StreamSocket> socket;
std::optional<Poco::Net::SocketAddress> address;
std::shared_ptr<PacketEndpoint> packet_endpoint;
MySQLProtocol::PacketEndpointPtr packet_endpoint;
void handshake();
void registerSlaveOnMaster(UInt32 slave_id);

View File

@ -1,11 +1,11 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include "IMySQLReadPacket.h"
#include "IMySQLWritePacket.h"
#include "IO/MySQLPacketPayloadReadBuffer.h"
#include <base/shared_ptr_helper.h>
namespace DB
{
@ -16,13 +16,19 @@ namespace MySQLProtocol
/* Writes and reads packets, keeping sequence-id.
* Throws ProtocolError, if packet with incorrect sequence-id was received.
*/
class PacketEndpoint : public shared_ptr_helper<PacketEndpoint>
class PacketEndpoint : boost::noncopyable
{
public:
uint8_t & sequence_id;
ReadBuffer * in;
WriteBuffer * out;
/// For writing.
PacketEndpoint(WriteBuffer & out_, uint8_t & sequence_id_);
/// For reading and writing.
PacketEndpoint(ReadBuffer & in_, WriteBuffer & out_, uint8_t & sequence_id_);
MySQLPacketPayloadReadBuffer getPayload();
void receivePacket(IMySQLReadPacket & packet);
@ -43,15 +49,6 @@ public:
/// Converts packet to text. Is used for debug output.
static String packetToText(const String & payload);
protected:
/// For writing.
PacketEndpoint(WriteBuffer & out_, uint8_t & sequence_id_);
/// For reading and writing.
PacketEndpoint(ReadBuffer & in_, WriteBuffer & out_, uint8_t & sequence_id_);
friend struct shared_ptr_helper<PacketEndpoint>;
};
using PacketEndpointPtr = std::shared_ptr<PacketEndpoint>;

View File

@ -122,7 +122,7 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
for (auto setting : settings.all())
{
const auto & name = setting.getName();
if (config.has(name))
if (config.has(name) && !setting.isObsolete())
{
throw Exception(fmt::format("A setting '{}' appeared at top level in config {}."
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."

View File

@ -725,7 +725,10 @@ class IColumn;
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0)\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
\
M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \
M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -31,7 +31,7 @@ namespace
DictionaryStructure dictionary_structure = ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
auto comment = load_result.config->config->getString("dictionary.comment", "");
return StorageDictionary::create(
return std::make_shared<StorageDictionary>(
StorageID(database_name, load_result.name),
load_result.name,
dictionary_structure,

View File

@ -236,7 +236,7 @@ void DatabaseMySQL::fetchLatestTablesStructureIntoCache(
local_tables_cache[table_name] = std::make_pair(
table_modification_time,
StorageMySQL::create(
std::make_shared<StorageMySQL>(
StorageID(database_name, table_name),
std::move(mysql_pool),
database_name_in_mysql,

View File

@ -94,12 +94,12 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
if (storage)
{
/// Nested table was already created and synchronized.
storage = StorageMaterializedPostgreSQL::create(storage, getContext(), remote_database_name, table_name);
storage = std::make_shared<StorageMaterializedPostgreSQL>(storage, getContext(), remote_database_name, table_name);
}
else
{
/// Nested table does not exist and will be created by replication thread.
storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
}
/// Cache MaterializedPostgreSQL wrapper over nested table.
@ -210,7 +210,7 @@ ASTPtr DatabaseMaterializedPostgreSQL::getCreateTableQueryImpl(const String & ta
std::lock_guard lock(handler_mutex);
auto storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
auto ast_storage = replication_handler->getCreateNestedTableQuery(storage.get(), table_name);
assert_cast<ASTCreateQuery *>(ast_storage.get())->uuid = UUIDHelpers::generateV4();
return ast_storage;
@ -291,7 +291,7 @@ void DatabaseMaterializedPostgreSQL::attachTable(ContextPtr context_, const Stri
InterpreterAlterQuery(alter_query, current_context).execute();
auto storage = StorageMaterializedPostgreSQL::create(table, getContext(), remote_database_name, table_name);
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(table, getContext(), remote_database_name, table_name);
materialized_tables[table_name] = storage;
std::lock_guard lock(handler_mutex);

View File

@ -187,7 +187,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
if (!columns_info)
return StoragePtr{};
auto storage = StoragePostgreSQL::create(
auto storage = std::make_shared<StoragePostgreSQL>(
StorageID(database_name, table_name), pool, table_name,
ColumnsDescription{columns_info->columns}, ConstraintsDescription{}, String{}, configuration.schema, configuration.on_conflict);

View File

@ -145,7 +145,7 @@ StoragePtr DatabaseSQLite::fetchTable(const String & table_name, ContextPtr loca
if (!columns)
return StoragePtr{};
auto storage = StorageSQLite::create(
auto storage = std::make_shared<StorageSQLite>(
StorageID(database_name, table_name),
sqlite_db,
database_path,

View File

@ -504,7 +504,7 @@ Pipe CacheDictionary<dictionary_key_type>::read(const Names & column_names, size
}
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
return result;

View File

@ -1,5 +1,6 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <memory>
#include <Columns/IColumn.h>
#include <Core/Names.h>
@ -13,18 +14,14 @@ namespace DB
class DictionarySource;
class DictionarySourceCoordinator final : public shared_ptr_helper<DictionarySourceCoordinator>, public std::enable_shared_from_this<DictionarySourceCoordinator>
class DictionarySourceCoordinator final : public std::enable_shared_from_this<DictionarySourceCoordinator>
, private boost::noncopyable
{
friend struct shared_ptr_helper<DictionarySourceCoordinator>;
public:
using ReadColumnsFunc = std::function<Columns (const Strings &, const DataTypes &, const Columns &, const DataTypes &, const Columns &)>;
Pipe read(size_t num_streams);
private:
explicit DictionarySourceCoordinator(
std::shared_ptr<const IDictionary> dictionary_,
const Names & column_names,
@ -85,6 +82,8 @@ private:
initialize(column_names);
}
private:
friend class DictionarySource;
bool getKeyColumnsNextRangeToRead(ColumnsWithTypeAndName & key_columns, ColumnsWithTypeAndName & data_columns);

View File

@ -573,7 +573,7 @@ Pipe FlatDictionary::read(const Names & column_names, size_t max_block_size, siz
ColumnsWithTypeAndName key_columns = {ColumnWithTypeAndName(keys_column, std::make_shared<DataTypeUInt64>(), dict_struct.id->name)};
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator =std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
return result;

View File

@ -758,7 +758,7 @@ Pipe HashedArrayDictionary<dictionary_key_type>::read(const Names & column_names
}
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
return result;

View File

@ -745,7 +745,7 @@ Pipe HashedDictionary<dictionary_key_type, sparse>::read(const Names & column_na
}
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
return result;

View File

@ -873,7 +873,7 @@ Pipe IPAddressDictionary::read(const Names & column_names, size_t max_block_size
}
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns_with_type), std::move(view_columns), max_block_size);
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns_with_type), std::move(view_columns), max_block_size);
auto result = coordinator->read(num_streams);
return result;

View File

@ -1005,7 +1005,7 @@ Pipe RangeHashedDictionary<dictionary_key_type>::read(const Names & column_names
return result;
};
auto coordinator = DictionarySourceCoordinator::create(
auto coordinator = std::make_shared<DictionarySourceCoordinator>(
dictionary,
column_names,
std::move(key_columns),

View File

@ -60,8 +60,9 @@ void CachedReadBufferFromRemoteFS::appendFilesystemCacheLog(
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.remote_file_path = remote_fs_object_path,
.source_file_path = remote_fs_object_path,
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.file_segment_size = file_segment_range.size(),
};
switch (type)

View File

@ -7,6 +7,12 @@
#include <Common/logger_useful.h>
#include <Interpreters/FilesystemCacheLog.h>
namespace CurrentMetrics
{
extern const Metric FilesystemCacheReadBuffers;
}
namespace DB
{
@ -110,6 +116,8 @@ private:
String query_id;
bool enable_logging = false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::FilesystemCacheReadBuffers};
};
}

View File

@ -0,0 +1,69 @@
#include <Formats/ColumnMapping.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
void ColumnMapping::setupByHeader(const Block & header)
{
column_indexes_for_input_fields.resize(header.columns());
names_of_columns = header.getNames();
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
column_indexes_for_input_fields[i] = i;
}
void ColumnMapping::addColumns(
const Names & column_names, const std::unordered_map<String, size_t> & column_indexes_by_names, const FormatSettings & settings)
{
std::vector<bool> read_columns(column_indexes_by_names.size(), false);
for (const auto & name : column_names)
{
names_of_columns.push_back(name);
const auto column_it = column_indexes_by_names.find(name);
if (column_it == column_indexes_by_names.end())
{
if (settings.skip_unknown_fields)
{
column_indexes_for_input_fields.push_back(std::nullopt);
continue;
}
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Unknown field found in format header: '{}' at position {}\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
name, column_indexes_for_input_fields.size());
}
const auto column_index = column_it->second;
if (read_columns[column_index])
throw Exception("Duplicate field found while parsing format header: " + name, ErrorCodes::INCORRECT_DATA);
read_columns[column_index] = true;
column_indexes_for_input_fields.emplace_back(column_index);
}
for (size_t i = 0; i != read_columns.size(); ++i)
{
if (!read_columns[i])
not_presented_columns.push_back(i);
}
}
void ColumnMapping::insertDefaultsForNotSeenColumns(MutableColumns & columns, std::vector<UInt8> & read_columns)
{
for (auto index : not_presented_columns)
{
columns[index]->insertDefault();
read_columns[index] = false;
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Core/Block.h>
#include <Formats/FormatSettings.h>
namespace DB
{
/// Used for input text formats with headers/structure to map columns from input
/// and columns in header by names.
/// It's also used to pass info from header between different InputFormats in ParallelParsing
struct ColumnMapping
{
/// Special flag for ParallelParsing. Non-atomic because there is strict
/// `happens-before` between read and write access. See InputFormatParallelParsing
bool is_set{false};
/// Maps indexes of columns in the input file to indexes of table columns
using OptionalIndexes = std::vector<std::optional<size_t>>;
OptionalIndexes column_indexes_for_input_fields;
/// The list of column indexes that are not presented in input data.
std::vector<size_t> not_presented_columns;
/// The list of column names in input data. Needed for better exception messages.
std::vector<String> names_of_columns;
void setupByHeader(const Block & header);
void addColumns(
const Names & column_names, const std::unordered_map<String, size_t> & column_indexes_by_names, const FormatSettings & settings);
void insertDefaultsForNotSeenColumns(MutableColumns & columns, std::vector<UInt8> & read_columns);
};
}

View File

@ -150,6 +150,8 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation;
format_settings.max_rows_to_read_for_schema_inference = settings.input_format_max_rows_to_read_for_schema_inference;
format_settings.column_names_for_schema_inference = settings.column_names_for_schema_inference;
format_settings.mysql_dump.table_name = settings.input_format_mysql_dump_table_name;
format_settings.mysql_dump.map_column_names = settings.input_format_mysql_dump_map_column_names;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)
@ -220,7 +222,6 @@ InputFormatPtr FormatFactory::getInput(
(ReadBuffer & input) -> InputFormatPtr
{ return input_getter(input, sample, row_input_format_params, format_settings); };
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing,
context->getApplicationType() == Context::ApplicationType::SERVER};

View File

@ -262,6 +262,12 @@ struct FormatSettings
UInt64 number_of_columns = 0;
MsgPackUUIDRepresentation output_uuid_representation = MsgPackUUIDRepresentation::EXT;
} msgpack;
struct MySQLDump
{
String table_name;
bool map_column_names = true;
} mysql_dump;
};
}

View File

@ -15,6 +15,9 @@ void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsObject(FormatFactory & factory);
void registerFileSegmentationEngineJSONCompactEachRow(FormatFactory & factory);
#if USE_HIVE
void registerFileSegmentationEngineHiveText(FormatFactory & factory);
#endif
/// Formats for both input/output.
@ -55,6 +58,8 @@ void registerInputFormatRawBLOB(FormatFactory & factory);
void registerOutputFormatRawBLOB(FormatFactory & factory);
void registerInputFormatCustomSeparated(FormatFactory & factory);
void registerOutputFormatCustomSeparated(FormatFactory & factory);
void registerInputFormatCapnProto(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
/// Output only (presentational) formats.
@ -71,7 +76,6 @@ void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatMySQLWire(FormatFactory & factory);
void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
void registerOutputFormatPrometheus(FormatFactory & factory);
/// Input only formats.
@ -80,7 +84,7 @@ void registerInputFormatRegexp(FormatFactory & factory);
void registerInputFormatJSONAsString(FormatFactory & factory);
void registerInputFormatJSONAsObject(FormatFactory & factory);
void registerInputFormatLineAsString(FormatFactory & factory);
void registerInputFormatCapnProto(FormatFactory & factory);
void registerInputFormatMySQLDump(FormatFactory & factory);
#if USE_HIVE
void registerInputFormatHiveText(FormatFactory & factory);
@ -114,6 +118,7 @@ void registerRegexpSchemaReader(FormatFactory & factory);
void registerTSKVSchemaReader(FormatFactory & factory);
void registerValuesSchemaReader(FormatFactory & factory);
void registerTemplateSchemaReader(FormatFactory & factory);
void registerMySQLSchemaReader(FormatFactory & factory);
void registerFileExtensions(FormatFactory & factory);
@ -128,6 +133,10 @@ void registerFormats()
registerFileSegmentationEngineJSONAsString(factory);
registerFileSegmentationEngineJSONAsObject(factory);
registerFileSegmentationEngineJSONCompactEachRow(factory);
#if USE_HIVE
registerFileSegmentationEngineHiveText(factory);
#endif
registerInputFormatNative(factory);
registerOutputFormatNative(factory);
@ -193,6 +202,7 @@ void registerFormats()
#endif
registerInputFormatCapnProto(factory);
registerInputFormatMySQLDump(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);
@ -221,6 +231,7 @@ void registerFormats()
registerTSKVSchemaReader(factory);
registerValuesSchemaReader(factory);
registerTemplateSchemaReader(factory);
registerMySQLSchemaReader(factory);
}
}

View File

@ -3283,6 +3283,19 @@ private:
return res;
};
}
else if (checkAndGetDataType<DataTypeObject>(from_type.get()))
{
return [is_nullable = to_type->hasNullableSubcolumns()] (ColumnsWithTypeAndName & arguments, const DataTypePtr & , const ColumnNullable * , size_t) -> ColumnPtr
{
auto & column_object = assert_cast<const ColumnObject &>(*arguments.front().column);
auto res = ColumnObject::create(is_nullable);
for (size_t i = 0; i < column_object.size(); i++)
res->insert(column_object[i]);
res->finalize();
return res;
};
}
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten named Tuple, Map or String. Got: {}", from_type->getName());

View File

@ -373,8 +373,8 @@ struct MinHashImpl
std::map<UInt64, BytesRef, Comp> values;
};
using MaxHeap = Heap<std::less<size_t>>;
using MinHeap = Heap<std::greater<size_t>>;
using MaxHeap = Heap<std::less<>>;
using MinHeap = Heap<std::greater<>>;
static ALWAYS_INLINE inline void ngramHashASCII(
MinHeap & min_heap,

View File

@ -1,5 +1,6 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <base/types.h>
#include <memory>
@ -11,7 +12,7 @@ class ReadBufferFromFileBase;
class SeekableReadBuffer;
/// Interface for reading an archive.
class IArchiveReader : public std::enable_shared_from_this<IArchiveReader>
class IArchiveReader : public std::enable_shared_from_this<IArchiveReader>, boost::noncopyable
{
public:
virtual ~IArchiveReader() = default;

View File

@ -1,5 +1,6 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <base/types.h>
#include <memory>
@ -9,7 +10,7 @@ namespace DB
class WriteBufferFromFileBase;
/// Interface for writing an archive.
class IArchiveWriter : public std::enable_shared_from_this<IArchiveWriter>
class IArchiveWriter : public std::enable_shared_from_this<IArchiveWriter>, boost::noncopyable
{
public:
/// Destructors finalizes writing the archive.

View File

@ -4,7 +4,6 @@
#if USE_MINIZIP
#include <IO/Archives/IArchiveReader.h>
#include <base/shared_ptr_helper.h>
#include <mutex>
#include <vector>
@ -16,9 +15,16 @@ class ReadBufferFromFileBase;
class SeekableReadBuffer;
/// Implementation of IArchiveReader for reading zip archives.
class ZipArchiveReader : public shared_ptr_helper<ZipArchiveReader>, public IArchiveReader
class ZipArchiveReader : public IArchiveReader
{
public:
/// Constructs an archive's reader that will read from a file in the local filesystem.
explicit ZipArchiveReader(const String & path_to_archive_);
/// Constructs an archive's reader that will read by making a read buffer by using
/// a specified function.
ZipArchiveReader(const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_);
~ZipArchiveReader() override;
/// Returns true if there is a specified file in the archive.
@ -43,14 +49,6 @@ public:
void setPassword(const String & password_) override;
private:
/// Constructs an archive's reader that will read from a file in the local filesystem.
explicit ZipArchiveReader(const String & path_to_archive_);
/// Constructs an archive's reader that will read by making a read buffer by using
/// a specified function.
ZipArchiveReader(const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_);
friend struct shared_ptr_helper<ZipArchiveReader>;
class ReadBufferFromZipArchive;
class FileEnumeratorImpl;
class HandleHolder;

View File

@ -4,7 +4,6 @@
#if USE_MINIZIP
#include <IO/Archives/IArchiveWriter.h>
#include <base/shared_ptr_helper.h>
#include <mutex>
@ -14,9 +13,15 @@ class WriteBuffer;
class WriteBufferFromFileBase;
/// Implementation of IArchiveWriter for writing zip archives.
class ZipArchiveWriter : public shared_ptr_helper<ZipArchiveWriter>, public IArchiveWriter
class ZipArchiveWriter : public IArchiveWriter
{
public:
/// Constructs an archive that will be written as a file in the local filesystem.
explicit ZipArchiveWriter(const String & path_to_archive_);
/// Constructs an archive that will be written by using a specified `archive_write_buffer_`.
ZipArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_);
/// Destructors finalizes writing the archive.
~ZipArchiveWriter() override;
@ -63,13 +68,6 @@ public:
static void checkEncryptionIsEnabled();
private:
/// Constructs an archive that will be written as a file in the local filesystem.
explicit ZipArchiveWriter(const String & path_to_archive_);
/// Constructs an archive that will be written by using a specified `archive_write_buffer_`.
ZipArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_);
friend struct shared_ptr_helper<ZipArchiveWriter>;
class WriteBufferFromZipArchive;
class HandleHolder;
using RawHandle = void *;

View File

@ -26,7 +26,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{
#if USE_MINIZIP
return ZipArchiveReader::create(path_to_archive, archive_read_function, archive_size);
return std::make_shared<ZipArchiveReader>(path_to_archive, archive_read_function, archive_size);
#else
throw Exception("minizip library is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
#endif

View File

@ -26,7 +26,7 @@ std::shared_ptr<IArchiveWriter> createArchiveWriter(
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{
#if USE_MINIZIP
return ZipArchiveWriter::create(path_to_archive, std::move(archive_write_buffer));
return std::make_shared<ZipArchiveWriter>(path_to_archive, std::move(archive_write_buffer));
#else
throw Exception("minizip library is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
#endif

View File

@ -19,6 +19,8 @@ public:
const ReadBuffer & getWrappedReadBuffer() const { return *in; }
ReadBuffer & getWrappedReadBuffer() { return *in; }
void prefetch() override { in->prefetch(); }
protected:
std::unique_ptr<ReadBuffer> in;
};

View File

@ -24,6 +24,8 @@ public:
~PeekableReadBuffer() override;
void prefetch() override { sub_buf.prefetch(); }
/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()
{

View File

@ -19,9 +19,9 @@
namespace ProfileEvents
{
extern const Event S3ReadMicroseconds;
extern const Event S3ReadBytes;
extern const Event S3ReadRequestsErrors;
extern const Event ReadBufferFromS3Microseconds;
extern const Event ReadBufferFromS3Bytes;
extern const Event ReadBufferFromS3RequestsErrors;
extern const Event ReadBufferSeekCancelConnection;
}
@ -121,14 +121,14 @@ bool ReadBufferFromS3::nextImpl()
/// Try to read a next portion of data.
next_result = impl->next();
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
break;
}
catch (const Exception & e)
{
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1);
LOG_DEBUG(
log,
@ -157,7 +157,7 @@ bool ReadBufferFromS3::nextImpl()
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
ProfileEvents::increment(ProfileEvents::S3ReadBytes, working_buffer.size());
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, working_buffer.size());
offset += working_buffer.size();
return true;

View File

@ -588,7 +588,10 @@ void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf)
template void readQuotedStringInto<true>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readQuotedStringInto<true>(String & s, ReadBuffer & buf);
template void readQuotedStringInto<false>(String & s, ReadBuffer & buf);
template void readDoubleQuotedStringInto<false>(NullOutput & s, ReadBuffer & buf);
template void readDoubleQuotedStringInto<false>(String & s, ReadBuffer & buf);
template void readBackQuotedStringInto<false>(String & s, ReadBuffer & buf);
void readDoubleQuotedString(String & s, ReadBuffer & buf)
{

View File

@ -20,7 +20,7 @@
namespace ProfileEvents
{
extern const Event S3WriteBytes;
extern const Event WriteBufferFromS3Bytes;
extern const Event RemoteFSCacheDownloadBytes;
}
@ -121,7 +121,7 @@ void WriteBufferFromS3::nextImpl()
}
}
ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset());
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, offset());
last_part_size += offset();

View File

@ -52,7 +52,7 @@ void writeException(const Exception & e, WriteBuffer & buf, bool with_stack_trac
{
writeBinary(e.code(), buf);
writeBinary(String(e.name()), buf);
writeBinary(e.displayText(), buf);
writeBinary(e.displayText() + getExtraExceptionInfo(e), buf);
if (with_stack_trace)
writeBinary(e.getStackTraceString(), buf);

View File

@ -1,5 +1,6 @@
#include <IO/ZstdDeflatingAppendableWriteBuffer.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromFile.h>
namespace DB
{
@ -10,10 +11,15 @@ namespace ErrorCodes
}
ZstdDeflatingAppendableWriteBuffer::ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, bool append_to_existing_stream_,
size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, append_to_existing_stream(append_to_existing_stream_)
std::unique_ptr<WriteBufferFromFile> out_,
int compression_level,
bool append_to_existing_file_,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory(buf_size, existing_memory, alignment)
, out(std::move(out_))
, append_to_existing_file(append_to_existing_file_)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
@ -31,13 +37,11 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
if (!offset())
return;
ZSTD_EndDirective mode = ZSTD_e_flush;
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
if (first_write && append_to_existing_stream)
if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
{
addEmptyBlock();
first_write = false;
@ -54,11 +58,12 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
output.size = out->buffer().size();
output.pos = out->offset();
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_flush);
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED, "Zstd stream encoding failed: error code: {}; zstd version: {}", ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
first_write = false;
out->position() = out->buffer().begin() + output.pos;
bool everything_was_compressed = (input.pos == input.size);
@ -73,6 +78,7 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
out->position() = out->buffer().begin();
throw;
}
}
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
@ -87,10 +93,22 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
/// To free cctx
finalizeZstd();
/// Nothing was written
return;
}
WriteBufferDecorator::finalizeImpl();
else
{
try
{
finalizeBefore();
out->finalize();
finalizeAfter();
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
throw;
}
}
}
void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
@ -107,11 +125,15 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
output.size = out->buffer().size();
output.pos = out->offset();
/// Actually we can use ZSTD_e_flush here and add empty termination
/// block on each new buffer creation for non-empty file unconditionally (without isNeedToAddEmptyBlock).
/// However ZSTD_decompressStream is able to read non-terminated frame (we use it in reader buffer),
/// but console zstd utility cannot.
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
while (remaining != 0)
{
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: error: '{}' zstd version: {}", ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "Zstd stream encoder end failed: error: '{}' zstd version: {}", ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
}
@ -143,14 +165,40 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeZstd()
void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock()
{
/// HACK: https://github.com/facebook/zstd/issues/2090#issuecomment-620158967
static const char empty_block[3] = {0x01, 0x00, 0x00};
if (out->buffer().size() - out->offset() < sizeof(empty_block))
if (out->buffer().size() - out->offset() < ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size())
out->next();
std::memcpy(out->buffer().begin() + out->offset(), empty_block, sizeof(empty_block));
std::memcpy(out->buffer().begin() + out->offset(),
ZSTD_CORRECT_TERMINATION_LAST_BLOCK.data(), ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size());
out->position() = out->buffer().begin() + out->offset() + sizeof(empty_block);
out->position() = out->buffer().begin() + out->offset() + ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size();
}
bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock()
{
ReadBufferFromFile reader(out->getFileName());
auto fsize = reader.size();
if (fsize > 3)
{
std::array<char, 3> result;
reader.seek(fsize - 3, SEEK_SET);
reader.readStrict(result.data(), 3);
/// If we don't have correct block in the end, then we need to add it manually.
/// NOTE: maybe we can have the same bytes in case of data corruption/unfinished write.
/// But in this case file still corrupted and we have to remove it.
return result != ZSTD_CORRECT_TERMINATION_LAST_BLOCK;
}
else if (fsize > 0)
{
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"Trying to write to non-empty file '{}' with tiny size {}. It can lead to data corruption",
out->getFileName(), fsize);
}
return false;
}
}

View File

@ -4,6 +4,7 @@
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferDecorator.h>
#include <IO/WriteBufferFromFile.h>
#include <zstd.h>
@ -15,18 +16,22 @@ namespace DB
/// Main differences from ZstdDeflatingWriteBuffer:
/// 1) Allows to continue to write to the same output even if finalize() (or destructor) was not called, for example
/// when server was killed with 9 signal. Natively zstd doesn't support such feature because
/// ZSTD_decompressStream expect to see empty block at the end of each frame. There is not API function for it
/// ZSTD_decompressStream expect to see empty block (3 bytes 0x01, 0x00, 0x00) at the end of each frame. There is not API function for it
/// so we just use HACK and add empty block manually on the first write (see addEmptyBlock). Maintainers of zstd
/// said that there is no risks of compatibility issues https://github.com/facebook/zstd/issues/2090#issuecomment-620158967.
/// 2) Doesn't support internal ZSTD check-summing, because ZSTD checksums written at the end of frame (frame epilogue).
///
class ZstdDeflatingAppendableWriteBuffer : public WriteBufferWithOwnMemoryDecorator
class ZstdDeflatingAppendableWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
using ZSTDLastBlock = const std::array<char, 3>;
/// Frame end block. If we read non-empty file and see no such flag we should add it.
static inline constexpr ZSTDLastBlock ZSTD_CORRECT_TERMINATION_LAST_BLOCK = {0x01, 0x00, 0x00};
ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
std::unique_ptr<WriteBufferFromFile> out_,
int compression_level,
bool append_to_existing_stream_, /// if true then out mustn't be empty
bool append_to_existing_file_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
@ -39,6 +44,8 @@ public:
out->sync();
}
WriteBuffer * getNestedBuffer() { return out.get(); }
private:
/// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full
void nextImpl() override;
@ -50,15 +57,20 @@ private:
/// After the first call to this function, subsequent calls will have no effect and
/// an attempt to write to this buffer will result in exception.
void finalizeImpl() override;
void finalizeBefore() override;
void finalizeAfter() override;
void finalizeBefore();
void finalizeAfter();
void finalizeZstd();
/// Adding zstd empty block to out.working_buffer
/// Read three last bytes from non-empty compressed file and compares them with
/// ZSTD_CORRECT_TERMINATION_LAST_BLOCK.
bool isNeedToAddEmptyBlock();
/// Adding zstd empty block (ZSTD_CORRECT_TERMINATION_LAST_BLOCK) to out.working_buffer
void addEmptyBlock();
/// We appending data to existing stream so on the first nextImpl call we
/// will append empty block.
bool append_to_existing_stream;
std::unique_ptr<WriteBufferFromFile> out;
bool append_to_existing_file = false;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;

View File

@ -564,13 +564,12 @@ Aggregator::Aggregator(const Params & params_) : params(params_)
void Aggregator::compileAggregateFunctionsIfNeeded()
{
static std::unordered_map<UInt128, UInt64, UInt128Hash> aggregate_functions_description_to_count;
static std::mutex mtx;
static std::mutex mutex;
if (!params.compile_aggregate_expressions)
return;
std::vector<AggregateFunctionWithOffset> functions_to_compile;
size_t aggregate_instructions_size = 0;
String functions_description;
is_aggregate_function_compiled.resize(aggregate_functions.size());
@ -598,7 +597,6 @@ void Aggregator::compileAggregateFunctionsIfNeeded()
functions_description += ' ';
}
++aggregate_instructions_size;
is_aggregate_function_compiled[i] = function->isCompilable();
}
@ -612,10 +610,11 @@ void Aggregator::compileAggregateFunctionsIfNeeded()
aggregate_functions_description_hash.get128(aggregate_functions_description_hash_key);
{
std::lock_guard<std::mutex> lock(mtx);
std::lock_guard<std::mutex> lock(mutex);
if (aggregate_functions_description_to_count[aggregate_functions_description_hash_key]++ < params.min_count_to_compile_aggregate_expression)
return;
}
if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
{
@ -634,7 +633,6 @@ void Aggregator::compileAggregateFunctionsIfNeeded()
auto compiled_aggregate_functions = compileAggregateFunctions(getJITInstance(), functions_to_compile, functions_description);
compiled_aggregate_functions_holder = std::make_shared<CompiledAggregateFunctionsHolder>(std::move(compiled_aggregate_functions));
}
}
}
#endif

View File

@ -455,6 +455,8 @@ struct ContextSharedPart
delete_message_broker_schedule_pool.reset();
delete_ddl_worker.reset();
delete_access_control.reset();
total_memory_tracker.resetOvercommitTracker();
}
bool hasTraceCollector() const
@ -3314,7 +3316,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
background_common_pool_size = config.getUInt64("profiles.default.background_common_pool_size");
/// With this executor we can execute more tasks than threads we have
shared->merge_mutate_executor = MergeMutateBackgroundExecutor::create
shared->merge_mutate_executor = std::make_shared<MergeMutateBackgroundExecutor>
(
"MergeMutate",
/*max_threads_count*/background_pool_size,
@ -3324,7 +3326,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}",
background_pool_size, background_pool_size * background_merges_mutations_concurrency_ratio);
shared->moves_executor = OrdinaryBackgroundExecutor::create
shared->moves_executor = std::make_shared<OrdinaryBackgroundExecutor>
(
"Move",
background_move_pool_size,
@ -3333,7 +3335,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
);
LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}", background_move_pool_size, background_move_pool_size);
shared->fetch_executor = OrdinaryBackgroundExecutor::create
shared->fetch_executor = std::make_shared<OrdinaryBackgroundExecutor>
(
"Fetch",
background_fetches_pool_size,
@ -3342,7 +3344,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
);
LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}", background_fetches_pool_size, background_fetches_pool_size);
shared->common_executor = OrdinaryBackgroundExecutor::create
shared->common_executor = std::make_shared<OrdinaryBackgroundExecutor>
(
"Common",
background_common_pool_size,
@ -3411,6 +3413,7 @@ ReadSettings Context::getReadSettings() const
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.remote_fs_buffer_size = settings.max_read_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;

View File

@ -92,7 +92,7 @@ TemporaryTableHolder::TemporaryTableHolder(
context_,
[&](const StorageID & table_id)
{
auto storage = StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{constraints}, String{});
auto storage = std::make_shared<StorageMemory>(table_id, ColumnsDescription{columns}, ConstraintsDescription{constraints}, String{});
if (create_for_global_subquery)
storage->delayReadForGlobalSubqueries();

View File

@ -33,8 +33,9 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"query_id", std::make_shared<DataTypeString>()},
{"remote_file_path", std::make_shared<DataTypeString>()},
{"source_file_path", std::make_shared<DataTypeString>()},
{"file_segment_range", std::make_shared<DataTypeTuple>(std::move(types))},
{"size", std::make_shared<DataTypeUInt64>()},
{"read_type", std::make_shared<DataTypeString>()},
};
}
@ -48,8 +49,9 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(query_id);
columns[i++]->insert(remote_file_path);
columns[i++]->insert(source_file_path);
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
columns[i++]->insert(file_segment_size);
columns[i++]->insert(typeToString(read_type));
}

View File

@ -34,10 +34,11 @@ struct FilesystemCacheLogElement
time_t event_time{};
String query_id;
String remote_file_path;
String source_file_path;
std::pair<size_t, size_t> file_segment_range{};
ReadType read_type{};
size_t file_segment_size;
static std::string name() { return "FilesystemCacheLog"; }

View File

@ -69,7 +69,7 @@ static inline String resolveDatabase(
return current_database != replica_clickhouse_database ? "" : replica_clickhouse_database;
}
static NamesAndTypesList getColumnsList(const ASTExpressionList * columns_definition)
NamesAndTypesList getColumnsList(const ASTExpressionList * columns_definition)
{
NamesAndTypesList columns_name_and_type;
for (const auto & declare_column_ast : columns_definition->children)

View File

@ -8,6 +8,7 @@
#include <Parsers/MySQL/ASTAlterQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
{
@ -88,6 +89,8 @@ using InterpreterMySQLAlterQuery = InterpreterMySQLDDLQuery<InterpreterAlterImpl
using InterpreterMySQLRenameQuery = InterpreterMySQLDDLQuery<InterpreterRenameImpl>;
using InterpreterMySQLCreateQuery = InterpreterMySQLDDLQuery<InterpreterCreateImpl>;
NamesAndTypesList getColumnsList(const ASTExpressionList * columns_definition);
}
}

View File

@ -344,9 +344,9 @@ QueryStatus::~QueryStatus()
if (auto * memory_tracker = getMemoryTracker())
{
if (user_process_list)
user_process_list->user_overcommit_tracker.unsubscribe(memory_tracker);
user_process_list->user_overcommit_tracker.onQueryStop(memory_tracker);
if (auto shared_context = getContext())
shared_context->getGlobalOvercommitTracker()->unsubscribe(memory_tracker);
shared_context->getGlobalOvercommitTracker()->onQueryStop(memory_tracker);
}
}

View File

@ -12,7 +12,6 @@
#include <Parsers/Lexer.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
namespace DB

View File

@ -48,7 +48,7 @@ void UserDefinedSQLFunctionFactory::registerFunction(ContextPtr context, const S
if (if_not_exists)
return;
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "User defined executable function '{}'", function_name);
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User defined executable function '{}' already exists", function_name);
}
std::lock_guard lock(mutex);

View File

@ -13,6 +13,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/typeid_cast.h>
@ -37,7 +38,19 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
return std::make_pair(literal->value, applyVisitor(FieldToDataType(), literal->value));
NamesAndTypesList source_columns = {{ "_dummy", std::make_shared<DataTypeUInt8>() }};
auto ast = node->clone();
if (ast->as<ASTSubquery>() != nullptr)
{
/** For subqueries getColumnName if there are no alias will return __subquery_ + 'hash'.
* If there is alias getColumnName for subquery will return alias.
* In result block name of subquery after QueryAliasesVisitor pass will be _subquery1.
* We specify alias for subquery, because we need to get column from result block.
*/
ast->setAlias("constant_expression");
}
ReplaceQueryParameterVisitor param_visitor(context->getQueryParameters());
param_visitor.visit(ast);
@ -46,6 +59,12 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
String name = ast->getColumnName();
auto syntax_result = TreeRewriter(context).analyze(ast, source_columns);
/// AST potentially could be transformed to literal during TreeRewriter analyze.
/// For example if we have SQL user defined function that return literal AS subquery.
if (ASTLiteral * literal = ast->as<ASTLiteral>())
return std::make_pair(literal->value, applyVisitor(FieldToDataType(), literal->value));
ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(ast, syntax_result, context).getConstActions();
/// There must be at least one column in the block so that it knows the number of rows.

View File

@ -79,6 +79,7 @@ namespace ProfileEvents
extern const Event QueryTimeMicroseconds;
extern const Event SelectQueryTimeMicroseconds;
extern const Event InsertQueryTimeMicroseconds;
extern const Event OtherQueryTimeMicroseconds;
}
namespace DB
@ -801,6 +802,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
ProfileEvents::increment(ProfileEvents::InsertQueryTimeMicroseconds, query_time);
}
else
{
ProfileEvents::increment(ProfileEvents::OtherQueryTimeMicroseconds, query_time);
}
element.query_duration_ms = info.elapsed_seconds * 1000;

View File

@ -2,26 +2,12 @@
#include <Processors/ISource.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Formats/ColumnMapping.h>
namespace DB
{
/// Used to pass info from header between different InputFormats in ParallelParsing
struct ColumnMapping
{
/// Non-atomic because there is strict `happens-before` between read and write access
/// See InputFormatParallelParsing
bool is_set{false};
/// Maps indexes of columns in the input file to indexes of table columns
using OptionalIndexes = std::vector<std::optional<size_t>>;
OptionalIndexes column_indexes_for_input_fields;
/// The list of column indexes that are not presented in input data.
std::vector<size_t> not_presented_columns;
/// The list of column names in input data. Needed for better exception messages.
std::vector<String> names_of_columns;
};
using ColumnMappingPtr = std::shared_ptr<ColumnMapping>;

View File

@ -4,6 +4,7 @@
#include <DataTypes/IDataType.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -22,6 +23,9 @@ public:
/// Exceptions: JSON, TSKV.
virtual bool hasStrictOrderOfColumns() const { return true; }
virtual bool needContext() const { return false; }
virtual void setContext(ContextPtr &) {}
virtual ~ISchemaReader() = default;
protected:

View File

@ -298,7 +298,7 @@ void registerInputFormatCSV(FormatFactory & factory)
registerWithNamesAndTypes("CSV", register_func);
}
static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
{
char * pos = in.position();
bool quotes = false;

View File

@ -82,4 +82,6 @@ private:
CSVFormatReader reader;
};
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows);
}

View File

@ -10,16 +10,20 @@ namespace ErrorCodes
}
static FormatSettings updateFormatSettings(const FormatSettings & settings)
static FormatSettings updateFormatSettings(const FormatSettings & settings, const Block & header)
{
FormatSettings updated = settings;
updated.skip_unknown_fields = true;
updated.date_time_input_format = FormatSettings::DateTimeInputFormat::BestEffort;
updated.csv.delimiter = updated.hive_text.fields_delimiter;
if (settings.hive_text.input_field_names.empty())
updated.hive_text.input_field_names = header.getNames();
return updated;
}
HiveTextRowInputFormat::HiveTextRowInputFormat(
const Block & header_, ReadBuffer & in_, const Params & params_, const FormatSettings & format_settings_)
: HiveTextRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_, updateFormatSettings(format_settings_))
: HiveTextRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_, updateFormatSettings(format_settings_, header_))
{
}
@ -56,5 +60,15 @@ void registerInputFormatHiveText(FormatFactory & factory)
return std::make_shared<HiveTextRowInputFormat>(sample, buf, params, settings);
});
}
void registerFileSegmentationEngineHiveText(FormatFactory & factory)
{
factory.registerFileSegmentationEngine(
"HiveText",
[](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) -> std::pair<bool, size_t> {
return fileSegmentationEngineCSVImpl(in, memory, min_chunk_size, 0);
});
}
}
#endif

View File

@ -11,7 +11,6 @@ namespace DB
{
/// A stream for input data in Hive Text format.
/// Parallel parsing is disabled currently.
class HiveTextRowInputFormat final : public CSVRowInputFormat
{
public:

View File

@ -0,0 +1,466 @@
#include "MySQLDumpRowInputFormat.h"
#include <IO/ReadHelpers.h>
#include <IO/PeekableReadBuffer.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <Interpreters/MySQL/InterpretersMySQLDDLQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/MySQL/ASTCreateDefines.h>
#include <Parsers/parseQuery.h>
#include <boost/algorithm/string.hpp>
#include <base/find_symbols.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int EMPTY_DATA_PASSED;
}
namespace
{
enum MySQLQueryType
{
NONE,
INSERT,
CREATE,
};
}
MySQLDumpRowInputFormat::MySQLDumpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_)
, table_name(format_settings_.mysql_dump.table_name)
, types(header_.getDataTypes())
, column_indexes_by_names(header_.getNamesToIndexesMap())
, format_settings(format_settings_)
{
}
static String readTableName(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
if (in.eof())
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected EOF while parsing MySQLDump format: expected table name");
String table_name;
if (*in.position() == '`')
readBackQuotedString(table_name, in);
else
readStringUntilWhitespace(table_name, in);
return table_name;
}
/// Skip all data until '*/' and all data after it until '\n'
static void skipMultilineComment(ReadBuffer & in)
{
while (!in.eof())
{
auto * pos = find_first_symbols<'*'>(in.position(), in.buffer().end());
in.position() = pos;
if (pos == in.buffer().end())
continue;
++in.position();
if (in.eof())
break;
if (*in.position() == '/')
{
++in.position();
break;
}
}
skipToNextLineOrEOF(in);
}
/// Read all data until ';' into a string.
/// Skip all quoted strings because they can contain symbol ';'.
static String readUntilEndOfQuery(ReadBuffer & in)
{
String result;
while (!in.eof())
{
auto * pos = find_first_symbols<'\'', '"', '`', ';'>(in.position(), in.buffer().end());
result.append(in.position(), pos - in.position());
in.position() = pos;
if (pos == in.buffer().end())
continue;
result.push_back(*in.position());
if (*in.position() == ';')
{
++in.position();
break;
}
if (*in.position() == '\'')
{
readQuotedStringInto<false>(result, in);
result.push_back('\'');
}
else if (*in.position() == '"')
{
readDoubleQuotedStringInto<false>(result, in);
result.push_back('"');
}
else
{
readBackQuotedStringInto<false>(result, in);
result.push_back('`');
}
}
skipWhitespaceIfAny(in);
return result;
}
static void skipQuery(ReadBuffer & in)
{
readUntilEndOfQuery(in);
}
static bool skipUntilRowStartedWithOneOfKeywords(const std::unordered_set<String> & keywords, ReadBuffer & in, String * keyword_out = nullptr)
{
while (true)
{
skipWhitespaceIfAny(in);
if (in.eof())
return false;
if (checkString("/*", in))
skipMultilineComment(in);
else if (checkString("--", in))
skipToNextLineOrEOF(in);
else
{
String keyword;
readStringUntilWhitespace(keyword, in);
boost::to_lower(keyword);
if (keywords.contains(keyword))
{
if (keyword_out)
*keyword_out = keyword;
/// Note that position in the buffer is located after keyword.
return true;
}
skipQuery(in);
}
}
}
void readUnquotedColumnName(String & name, ReadBuffer & in)
{
name.clear();
while (!in.eof())
{
char * pos = find_first_symbols<',', ')', ' '>(in.position(), in.buffer().end());
name.append(in.position(), pos - in.position());
in.position() = pos;
if (in.hasPendingData())
return;
}
}
/// Try to read column names from a list in INSERT query.
/// Like '(x, `column name`, z)'
void tryReadColumnNames(ReadBuffer & in, std::vector<String> * column_names)
{
skipWhitespaceIfAny(in);
/// Check that we have the list of columns.
if (in.eof() || *in.position() != '(')
return;
++in.position();
skipWhitespaceIfAny(in);
bool first = true;
while (!in.eof() && *in.position() != ')')
{
if (!first)
{
assertChar(',', in);
skipWhitespaceIfAny(in);
}
first = false;
String name;
if (!in.eof() && *in.position() == '`')
readBackQuotedString(name, in);
else
readUnquotedColumnName(name, in);
skipWhitespaceIfAny(in);
if (column_names)
column_names->push_back(name);
}
assertChar(')', in);
}
static MySQLQueryType skipToInsertOrCreateQuery(String & table_name, ReadBuffer & in, bool skip_create_query = false)
{
String current_table_name;
String keyword;
MySQLQueryType type = MySQLQueryType::NONE;
/// In MySQL dumps INSERT queries might be replaced with REPLACE queries.
std::unordered_set<String> keywords = {"insert", "replace"};
if (!skip_create_query)
keywords.insert("create");
do
{
if (!skipUntilRowStartedWithOneOfKeywords(keywords, in, &keyword))
return MySQLQueryType::NONE;
skipWhitespaceIfAny(in);
if (keyword != "create")
{
assertStringCaseInsensitive("into", in);
type = MySQLQueryType::INSERT;
}
else
{
/// Check that it's CREATE TABLE query.
if (!checkStringCaseInsensitive("table", in))
continue;
type = MySQLQueryType::CREATE;
}
current_table_name = readTableName(in);
if (table_name.empty())
table_name = current_table_name;
} while (current_table_name != table_name);
/// Note that now position in the buffer is located in CREATE/INSERT query after table name.
return type;
}
static bool skipToInsertQuery(String & table_name, ReadBuffer & in)
{
return skipToInsertOrCreateQuery(table_name, in, true) != MySQLQueryType::NONE;
}
static void skipToDataInInsertQuery(ReadBuffer & in, std::vector<String> * column_names = nullptr)
{
tryReadColumnNames(in, column_names);
skipWhitespaceIfAny(in);
assertStringCaseInsensitive("values", in);
skipWhitespaceIfAny(in);
}
static bool tryToExtractStructureFromCreateQuery(ReadBuffer & in, NamesAndTypesList & structure)
{
/// To extract structure from CREATE query we read it all into a string and parse using MySQLParser
/// and then extract columns names and types from MySQLParser::ASTCreateDefines
/// Now position is located in CREATE query after table name and we need to write the beginning of CREATE query.
String create_query_str = "CREATE TABLE _dummy " + readUntilEndOfQuery(in);
MySQLParser::ParserCreateQuery parser;
String error;
const char * start = create_query_str.data();
const char * end = create_query_str.data() + create_query_str.size();
ASTPtr query = tryParseQuery(parser, start, end, error, false, "MySQL create query", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH);
if (!query)
return false;
const auto * create_query = assert_cast<MySQLParser::ASTCreateQuery *>(query.get());
if (!create_query)
return false;
const auto * create_defines = create_query->columns_list->as<MySQLParser::ASTCreateDefines>();
if (!create_defines)
return false;
structure = MySQLInterpreter::getColumnsList(create_defines->columns);
return true;
}
static void skipStartOfRow(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar('(', in);
skipWhitespaceIfAny(in);
}
static void skipFieldDelimiter(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar(',', in);
skipWhitespaceIfAny(in);
}
static void skipEndOfRow(ReadBuffer & in, String & table_name)
{
skipWhitespaceIfAny(in);
assertChar(')', in);
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == ',')
++in.position();
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == ';')
{
/// ';' means end of INSERT query, skip until data from
/// next INSERT query into the same table or until EOF.
++in.position();
if (skipToInsertQuery(table_name, in))
skipToDataInInsertQuery(in);
}
}
static void readFirstCreateAndInsertQueries(ReadBuffer & in, String & table_name, NamesAndTypesList & structure_from_create, Names & column_names)
{
auto type = skipToInsertOrCreateQuery(table_name, in);
bool insert_query_present = type == MySQLQueryType::INSERT;
if (type == MySQLQueryType::CREATE)
{
/// If we have CREATE query, we can extract columns names and types from it.
if (tryToExtractStructureFromCreateQuery(in, structure_from_create))
column_names = structure_from_create.getNames();
skipQuery(in);
insert_query_present = skipToInsertQuery(table_name, in);
}
if (!insert_query_present)
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There is no INSERT queries{} in MySQL dump file", table_name.empty() ? "" : " for table " + table_name);
skipToDataInInsertQuery(in, column_names.empty() ? &column_names : nullptr);
}
void MySQLDumpRowInputFormat::readPrefix()
{
NamesAndTypesList structure_from_create;
Names column_names;
readFirstCreateAndInsertQueries(*in, table_name, structure_from_create, column_names);
if (!column_names.empty() && format_settings.mysql_dump.map_column_names)
column_mapping->addColumns(column_names, column_indexes_by_names, format_settings);
else
column_mapping->setupByHeader(getPort().getHeader());
}
bool MySQLDumpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (in->eof())
return false;
ext.read_columns.resize(types.size());
skipStartOfRow(*in);
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
{
if (file_column != 0)
skipFieldDelimiter(*in);
const auto & column_index = column_mapping->column_indexes_for_input_fields[file_column];
if (column_index)
ext.read_columns[*column_index] = readField(*columns[*column_index], *column_index);
else
skipField();
}
skipEndOfRow(*in, table_name);
column_mapping->insertDefaultsForNotSeenColumns(columns, ext.read_columns);
/// If defaults_for_omitted_fields is set to 0, we should leave already inserted defaults.
if (!format_settings.defaults_for_omitted_fields)
ext.read_columns.assign(ext.read_columns.size(), true);
return true;
}
bool MySQLDumpRowInputFormat::readField(IColumn & column, size_t column_idx)
{
const auto & type = types[column_idx];
const auto & serialization = serializations[column_idx];
if (format_settings.null_as_default && !type->isNullable() && !type->isLowCardinalityNullable())
return SerializationNullable::deserializeTextQuotedImpl(column, *in, format_settings, serialization);
serialization->deserializeTextQuoted(column, *in, format_settings);
return true;
}
void MySQLDumpRowInputFormat::skipField()
{
String tmp;
readQuotedFieldIntoString(tmp, *in);
}
MySQLDumpSchemaReader::MySQLDumpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowSchemaReader(in_, format_settings_), format_settings(format_settings_), table_name(format_settings_.mysql_dump.table_name)
{
}
NamesAndTypesList MySQLDumpSchemaReader::readSchema()
{
NamesAndTypesList structure_from_create;
Names names;
readFirstCreateAndInsertQueries(in, table_name, structure_from_create, names);
if (!structure_from_create.empty())
return structure_from_create;
if (!names.empty())
setColumnNames(names);
return IRowSchemaReader::readSchema();
}
DataTypes MySQLDumpSchemaReader::readRowAndGetDataTypes()
{
if (in.eof())
return {};
skipStartOfRow(in);
DataTypes data_types;
String value;
while (!in.eof() && *in.position() != ')')
{
if (!data_types.empty())
skipFieldDelimiter(in);
readQuotedFieldIntoString(value, in);
auto type = determineDataTypeByEscapingRule(value, format_settings, FormatSettings::EscapingRule::Quoted);
data_types.push_back(std::move(type));
}
skipEndOfRow(in, table_name);
return data_types;
}
void registerInputFormatMySQLDump(FormatFactory & factory)
{
factory.registerInputFormat("MySQLDump", [](
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MySQLDumpRowInputFormat>(buf, header, params, settings);
});
}
void registerMySQLSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader("MySQLDump", [](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_shared<MySQLDumpSchemaReader>(buf, settings);
});
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
namespace DB
{
class MySQLDumpRowInputFormat final : public IRowInputFormat
{
public:
MySQLDumpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_);
String getName() const override { return "MySQLDumpRowInputFormat"; }
void readPrefix() override;
private:
bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool readField(IColumn & column, size_t column_idx);
void skipField();
String table_name;
DataTypes types;
std::unordered_map<String, size_t> column_indexes_by_names;
const FormatSettings format_settings;
};
class MySQLDumpSchemaReader : public IRowSchemaReader
{
public:
MySQLDumpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings);
private:
NamesAndTypesList readSchema() override;
DataTypes readRowAndGetDataTypes() override;
const FormatSettings format_settings;
String table_name;
};
}

Some files were not shown because too many files have changed in this diff Show More