Merge branch 'master' into complete_zk_api

This commit is contained in:
alesapin 2020-11-04 20:37:30 +03:00
commit e2bce14aed
71 changed files with 1066 additions and 255 deletions

View File

@ -90,6 +90,7 @@ toc_title: Adopters
| <a href="https://www.splunk.com/" class="favicon">Splunk</a> | Business Analytics | Main product | — | — | [Slides in English, January 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup12/splunk.pdf) |
| <a href="https://www.spotify.com" class="favicon">Spotify</a> | Music | Experimentation | — | — | [Slides, July 2018](https://www.slideshare.net/glebus/using-clickhouse-for-experimentation-104247173) |
| <a href="https://www.staffcop.ru/" class="favicon">Staffcop</a> | Information Security | Main Product | — | — | [Official website, Documentation](https://www.staffcop.ru/sce43) |
| <a href="https://www.teralytics.net/" class="favicon">Teralytics</a> | Mobility | Analytics | — | — | [Tech blog](https://www.teralytics.net/knowledge-hub/visualizing-mobility-data-the-scalability-challenge) |
| <a href="https://www.tencent.com" class="favicon">Tencent</a> | Big Data | Data processing | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/5.%20ClickHouse大数据集群应用_李俊飞腾讯网媒事业部.pdf) |
| <a href="https://www.tencent.com" class="favicon">Tencent</a> | Messaging | Logging | — | — | [Talk in Chinese, November 2019](https://youtu.be/T-iVQRuw-QY?t=5050) |
| <a href="https://trafficstars.com/" class="favicon">Traffic Stars</a> | AD network | — | — | — | [Slides in Russian, May 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup15/lightning/ninja.pdf) |

View File

@ -680,6 +680,21 @@ Example:
log_queries=1
```
## log_queries_min_query_duration_ms {#settings-log-queries-min-query-duration-ms}
Minimal time for the query to run to get to the following tables:
- `system.query_log`
- `system.query_thread_log`
Only the queries with the following type will get to the log:
- `QUERY_FINISH`
- `EXCEPTION_WHILE_PROCESSING`
- Type: milliseconds
- Default value: 0 (any query)
## log_queries_min_type {#settings-log-queries-min-type}
`query_log` minimal type to log.

View File

@ -6,19 +6,21 @@ You can use this table to get information similar to the [DESCRIBE TABLE](../../
The `system.columns` table contains the following columns (the column type is shown in brackets):
- `database` (String) — Database name.
- `table` (String) — Table name.
- `name` (String) — Column name.
- `type` (String) — Column type.
- `default_kind` (String) — Expression type (`DEFAULT`, `MATERIALIZED`, `ALIAS`) for the default value, or an empty string if it is not defined.
- `default_expression` (String) — Expression for the default value, or an empty string if it is not defined.
- `data_compressed_bytes` (UInt64) — The size of compressed data, in bytes.
- `data_uncompressed_bytes` (UInt64) — The size of decompressed data, in bytes.
- `marks_bytes` (UInt64) — The size of marks, in bytes.
- `comment` (String) — Comment on the column, or an empty string if it is not defined.
- `is_in_partition_key` (UInt8) — Flag that indicates whether the column is in the partition expression.
- `is_in_sorting_key` (UInt8) — Flag that indicates whether the column is in the sorting key expression.
- `is_in_primary_key` (UInt8) — Flag that indicates whether the column is in the primary key expression.
- `is_in_sampling_key` (UInt8) — Flag that indicates whether the column is in the sampling key expression.
- `database` ([String](../../sql-reference/data-types/string.md)) — Database name.
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name.
- `name` ([String](../../sql-reference/data-types/string.md)) — Column name.
- `type` ([String](../../sql-reference/data-types/string.md)) — Column type.
- `position` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Ordinal position of a column in a table starting with 1.
- `default_kind` ([String](../../sql-reference/data-types/string.md)) — Expression type (`DEFAULT`, `MATERIALIZED`, `ALIAS`) for the default value, or an empty string if it is not defined.
- `default_expression` ([String](../../sql-reference/data-types/string.md)) — Expression for the default value, or an empty string if it is not defined.
- `data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The size of compressed data, in bytes.
- `data_uncompressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The size of decompressed data, in bytes.
- `marks_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The size of marks, in bytes.
- `comment` ([String](../../sql-reference/data-types/string.md)) — Comment on the column, or an empty string if it is not defined.
- `is_in_partition_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Flag that indicates whether the column is in the partition expression.
- `is_in_sorting_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Flag that indicates whether the column is in the sorting key expression.
- `is_in_primary_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Flag that indicates whether the column is in the primary key expression.
- `is_in_sampling_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Flag that indicates whether the column is in the sampling key expression.
- `compression_codec` ([String](../../sql-reference/data-types/string.md)) — Compression codec name.
[Original article](https://clickhouse.tech/docs/en/operations/system_tables/columns) <!--hide-->

View File

@ -780,4 +780,42 @@ Result:
└──────────────────────────────────┘
```
## formatRowNoNewline {#formatrownonewline}
Converts arbitrary expressions into a string via given format. The function trims the last `\n` if any.
**Syntax**
``` sql
formatRowNoNewline(format, x, y, ...)
```
**Parameters**
- `format` — Text format. For example, [CSV](../../interfaces/formats.md#csv), [TSV](../../interfaces/formats.md#tabseparated).
- `x`,`y`, ... — Expressions.
**Returned value**
- A formatted string.
**Example**
Query:
``` sql
SELECT formatRowNoNewline('CSV', number, 'good')
FROM numbers(3)
```
Result:
``` text
┌─formatRowNoNewline('CSV', number, 'good')─┐
│ 0,"good" │
│ 1,"good" │
│ 2,"good" │
└───────────────────────────────────────────┘
```
[Original article](https://clickhouse.tech/docs/en/query_language/functions/type_conversion_functions/) <!--hide-->

View File

@ -6,19 +6,21 @@
Таблица `system.columns` содержит столбцы (тип столбца указан в скобках):
- `database` (String) — имя базы данных.
- `table` (String) — имя таблицы.
- `name` (String) — имя столбца.
- `type` (String) — тип столбца.
- `default_kind` (String) — тип выражения (`DEFAULT`, `MATERIALIZED`, `ALIAS`) значения по умолчанию, или пустая строка.
- `default_expression` (String) — выражение для значения по умолчанию или пустая строка.
- `data_compressed_bytes` (UInt64) — размер сжатых данных в байтах.
- `data_uncompressed_bytes` (UInt64) — размер распакованных данных в байтах.
- `marks_bytes` (UInt64) — размер засечек в байтах.
- `comment` (String) — комментарий к столбцу или пустая строка.
- `is_in_partition_key` (UInt8) — флаг, показывающий включение столбца в ключ партиционирования.
- `is_in_sorting_key` (UInt8) — флаг, показывающий включение столбца в ключ сортировки.
- `is_in_primary_key` (UInt8) — флаг, показывающий включение столбца в первичный ключ.
- `is_in_sampling_key` (UInt8) — флаг, показывающий включение столбца в ключ выборки.
- `database` ([String](../../sql-reference/data-types/string.md)) — имя базы данных.
- `table` ([String](../../sql-reference/data-types/string.md)) — имя таблицы.
- `name` ([String](../../sql-reference/data-types/string.md)) — имя столбца.
- `type` ([String](../../sql-reference/data-types/string.md)) — тип столбца.
- `position` ([UInt64](../../sql-reference/data-types/int-uint.md)) — порядковый номер столбца в таблице (нумерация начинается с 1).
- `default_kind` ([String](../../sql-reference/data-types/string.md)) — тип выражения (`DEFAULT`, `MATERIALIZED`, `ALIAS`) для значения по умолчанию или пустая строка.
- `default_expression` ([String](../../sql-reference/data-types/string.md)) — выражение для значения по умолчанию или пустая строка.
- `data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — размер сжатых данных в байтах.
- `data_uncompressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — размер распакованных данных в байтах.
- `marks_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — размер засечек в байтах.
- `comment` ([String](../../sql-reference/data-types/string.md)) — комментарий к столбцу или пустая строка.
- `is_in_partition_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — флаг, показывающий включение столбца в ключ партиционирования.
- `is_in_sorting_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — флаг, показывающий включение столбца в ключ сортировки.
- `is_in_primary_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — флаг, показывающий включение столбца в первичный ключ.
- `is_in_sampling_key` ([UInt8](../../sql-reference/data-types/int-uint.md)) — флаг, показывающий включение столбца в ключ выборки.
- `compression_codec` ([String](../../sql-reference/data-types/string.md)) — имя кодека сжатия.
[Оригинальная статья](https://clickhouse.tech/docs/ru/operations/system_tables/columns) <!--hide-->

View File

@ -772,4 +772,43 @@ FROM numbers(3)
└──────────────────────────────────┘
```
## formatRowNoNewline {#formatrownonewline}
Преобразует произвольные выражения в строку заданного формата. При этом удаляет лишние переводы строк `\n`, если они появились.
**Синтаксис**
``` sql
formatRowNoNewline(format, x, y, ...)
```
**Параметры**
- `format` — Текстовый формат. Например, [CSV](../../interfaces/formats.md#csv), [TSV](../../interfaces/formats.md#tabseparated).
- `x`,`y`, ... — Выражения.
**Возвращаемое значение**
- Отформатированная строка (в текстовых форматах без завершающего перевода строки).
**Пример**
Запрос:
``` sql
SELECT formatRowNoNewline('CSV', number, 'good')
FROM numbers(3)
```
Ответ:
``` text
┌─formatRowNoNewline('CSV', number, 'good')─┐
│ 0,"good" │
│ 1,"good" │
│ 2,"good" │
└───────────────────────────────────────────┘
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/functions/type_conversion_functions/) <!--hide-->

View File

@ -3,7 +3,7 @@ machine_translated: true
machine_translated_rev: 5decc73b5dc60054f19087d3690c4eb99446a6c3
---
# 系统。query_log {#system_tables-query_log}
# system.query_log {#system_tables-query_log}
包含有关已执行查询的信息,例如,开始时间、处理持续时间、错误消息。
@ -140,4 +140,4 @@ Settings.Values: ['0','random','1','10000000000']
**另请参阅**
- [系统。query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread.
- [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread.

View File

@ -200,9 +200,6 @@ public:
}
private:
friend void qdigest_test(int normal_size, UInt64 value_limit, const std::vector<UInt64> & values, int queries_count, bool verbose);
friend void rs_perf_test();
/// We allocate a little memory on the stack - to avoid allocations when there are many objects with a small number of elements.
using Array = DB::PODArrayWithStackMemory<T, 64>;

View File

@ -172,7 +172,7 @@ protected:
void finalizeQueryProfiler();
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database);
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point<std::chrono::system_clock> now);
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr) const;

View File

@ -126,6 +126,7 @@ class IColumn;
M(UInt64, merge_tree_coarse_index_granularity, 8, "If the index segment can contain the required keys, divide it into as many parts and recursively check them.", 0) \
M(UInt64, merge_tree_max_rows_to_use_cache, (128 * 8192), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
M(UInt64, merge_tree_max_bytes_to_use_cache, (192 * 10 * 1024 * 1024), "The maximum number of bytes per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \
\
M(UInt64, mysql_max_rows_to_insert, 65536, "The maximum number of rows in MySQL batch insertion of the MySQL storage engine", 0) \
\
@ -150,6 +151,7 @@ class IColumn;
\
M(Bool, log_queries, 1, "Log requests and write the log to the system table.", 0) \
M(LogQueriesType, log_queries_min_type, QueryLogElementType::QUERY_START, "Minimal type in query_log to log, possible values (from low to high): QUERY_START, QUERY_FINISH, EXCEPTION_BEFORE_START, EXCEPTION_WHILE_PROCESSING.", 0) \
M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log.", 0) \
M(UInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \
\
M(DistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?", IMPORTANT) \

View File

@ -114,7 +114,8 @@ void DatabaseAtomic::dropTable(const Context &, const String & table_name, bool
DatabaseWithDictionaries::detachTableUnlocked(table_name, lock); /// Should never throw
table_name_to_path.erase(table_name);
}
tryRemoveSymlink(table_name);
if (table->storesDataOnDisk())
tryRemoveSymlink(table_name);
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
if (auto * mv = dynamic_cast<StorageMaterializedView *>(table.get()))
@ -145,7 +146,7 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
auto detach = [](DatabaseAtomic & db, const String & table_name_)
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink)
{
auto it = db.table_name_to_path.find(table_name_);
String table_data_path_saved;
@ -155,7 +156,7 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
assert(!table_data_path_saved.empty() || db.dictionaries.find(table_name_) != db.dictionaries.end());
db.tables.erase(table_name_);
db.table_name_to_path.erase(table_name_);
if (!table_data_path_saved.empty())
if (has_symlink)
db.tryRemoveSymlink(table_name_);
return table_data_path_saved;
};
@ -166,7 +167,8 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
if (table_data_path_.empty())
return;
db.table_name_to_path.emplace(table_name_, table_data_path_);
db.tryCreateSymlink(table_name_, table_data_path_);
if (table_->storesDataOnDisk())
db.tryCreateSymlink(table_name_, table_data_path_);
};
auto assert_can_move_mat_view = [inside_database](const StoragePtr & table_)
@ -228,9 +230,9 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
renameNoReplace(old_metadata_path, new_metadata_path);
/// After metadata was successfully moved, the following methods should not throw (if them do, it's a logical error)
table_data_path = detach(*this, table_name);
table_data_path = detach(*this, table_name, table->storesDataOnDisk());
if (exchange)
other_table_data_path = detach(other_db, to_table_name);
other_table_data_path = detach(other_db, to_table_name, other_table->storesDataOnDisk());
auto old_table_id = table->getStorageID();
@ -286,7 +288,8 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
DatabaseCatalog::instance().removeUUIDMappingFinally(query.uuid);
throw;
}
tryCreateSymlink(query.table, table_data_path);
if (table->storesDataOnDisk())
tryCreateSymlink(query.table, table_data_path);
}
void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path)
@ -383,17 +386,18 @@ void DatabaseAtomic::loadStoredObjects(Context & context, bool has_force_restore
Poco::File(path_to_table_symlinks).createDirectories();
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second);
tryCreateSymlink(table.first, table.second, true);
}
}
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path)
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)
{
try
{
String link = path_to_table_symlinks + escapeForFileName(table_name);
String data = Poco::Path(global_context.getPath()).makeAbsolute().toString() + actual_data_path;
Poco::File{data}.linkTo(link, Poco::File::LINK_SYMBOLIC);
Poco::File data = Poco::Path(global_context.getPath()).makeAbsolute().toString() + actual_data_path;
if (!if_data_path_exist || data.exists())
data.linkTo(link, Poco::File::LINK_SYMBOLIC);
}
catch (...)
{

View File

@ -55,7 +55,7 @@ public:
UUID tryGetTableUUID(const String & table_name) const override;
void tryCreateSymlink(const String & table_name, const String & actual_data_path);
void tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist = false);
void tryRemoveSymlink(const String & table_name);
void waitDetachedTableNotInUse(const UUID & uuid);

View File

@ -321,7 +321,7 @@ void DatabaseOnDisk::renameTable(
/// Special case: usually no actions with symlinks are required when detaching/attaching table,
/// but not when moving from Atomic database to Ordinary
if (from_atomic_to_ordinary)
if (from_atomic_to_ordinary && table->storesDataOnDisk())
{
auto & atomic_db = assert_cast<DatabaseAtomic &>(*this);
atomic_db.tryRemoveSymlink(table_name);

View File

@ -207,8 +207,22 @@ void AsynchronousMetrics::update()
/// We must update the value of total_memory_tracker periodically.
/// Otherwise it might be calculated incorrectly - it can include a "drift" of memory amount.
/// See https://github.com/ClickHouse/ClickHouse/issues/10293
total_memory_tracker.set(data.resident);
CurrentMetrics::set(CurrentMetrics::MemoryTracking, data.resident);
{
Int64 amount = total_memory_tracker.get();
Int64 peak = total_memory_tracker.getPeak();
Int64 new_peak = data.resident;
LOG_DEBUG(&Poco::Logger::get("AsynchronousMetrics"),
"MemoryTracking: was {}, peak {}, will set to {} (RSS), difference: {}",
ReadableSize(amount),
ReadableSize(peak),
ReadableSize(new_peak),
ReadableSize(new_peak - peak)
);
total_memory_tracker.set(new_peak);
CurrentMetrics::set(CurrentMetrics::MemoryTracking, new_peak);
}
}
#endif

View File

@ -134,7 +134,10 @@ void DatabaseCatalog::loadDatabases()
loadMarkedAsDroppedTables();
auto task_holder = global_context->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(task_holder));
(*drop_task)->activateAndSchedule();
(*drop_task)->activate();
std::lock_guard lock{tables_marked_dropped_mutex};
if (!tables_marked_dropped.empty())
(*drop_task)->schedule();
}
void DatabaseCatalog::shutdownImpl()
@ -760,14 +763,15 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
std::lock_guard lock(tables_marked_dropped_mutex);
if (ignore_delay)
tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, 0});
tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, drop_time});
else
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time});
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time + drop_delay_sec});
tables_marked_dropped_ids.insert(table_id.uuid);
CurrentMetrics::add(CurrentMetrics::TablesToDropQueueSize, 1);
/// If list of dropped tables was empty, start a drop task
if (drop_task && tables_marked_dropped.size() == 1)
/// If list of dropped tables was empty, start a drop task.
/// If ignore_delay is set, schedule drop task as soon as possible.
if (drop_task && (tables_marked_dropped.size() == 1 || ignore_delay))
(*drop_task)->schedule();
}
@ -777,26 +781,40 @@ void DatabaseCatalog::dropTableDataTask()
/// Table can be removed when it's not used by queries and drop_delay_sec elapsed since it was marked as dropped.
bool need_reschedule = true;
/// Default reschedule time for the case when we are waiting for reference count to become 1.
size_t schedule_after_ms = reschedule_time_ms;
TableMarkedAsDropped table;
try
{
std::lock_guard lock(tables_marked_dropped_mutex);
assert(!tables_marked_dropped.empty());
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
time_t min_drop_time = std::numeric_limits<time_t>::max();
size_t tables_in_use_count = 0;
auto it = std::find_if(tables_marked_dropped.begin(), tables_marked_dropped.end(), [&](const auto & elem)
{
bool not_in_use = !elem.table || elem.table.unique();
bool old_enough = elem.drop_time + drop_delay_sec < current_time;
bool old_enough = elem.drop_time <= current_time;
min_drop_time = std::min(min_drop_time, elem.drop_time);
tables_in_use_count += !not_in_use;
return not_in_use && old_enough;
});
if (it != tables_marked_dropped.end())
{
table = std::move(*it);
LOG_INFO(log, "Will try drop {}", table.table_id.getNameForLogs());
LOG_INFO(log, "Have {} tables in drop queue ({} of them are in use), will try drop {}",
tables_marked_dropped.size(), tables_in_use_count, table.table_id.getNameForLogs());
tables_marked_dropped.erase(it);
/// Schedule the task as soon as possible, while there are suitable tables to drop.
schedule_after_ms = 0;
}
else
else if (current_time < min_drop_time)
{
LOG_TRACE(log, "Not found any suitable tables to drop, still have {} tables in drop queue", tables_marked_dropped.size());
/// We are waiting for drop_delay_sec to exceed, no sense to wakeup until min_drop_time.
/// If new table is added to the queue with ignore_delay flag, schedule() is called to wakeup the task earlier.
schedule_after_ms = (min_drop_time - current_time) * 1000;
LOG_TRACE(log, "Not found any suitable tables to drop, still have {} tables in drop queue ({} of them are in use). "
"Will check again after {} seconds", tables_marked_dropped.size(), tables_in_use_count, min_drop_time - current_time);
}
need_reschedule = !tables_marked_dropped.empty();
}
@ -820,11 +838,15 @@ void DatabaseCatalog::dropTableDataTask()
tryLogCurrentException(log, "Cannot drop table " + table.table_id.getNameForLogs() +
". Will retry later.");
{
table.drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + drop_error_cooldown_sec;
std::lock_guard lock(tables_marked_dropped_mutex);
tables_marked_dropped.emplace_back(std::move(table));
/// If list of dropped tables was empty, schedule a task to retry deletion.
if (tables_marked_dropped.size() == 1)
{
need_reschedule = true;
schedule_after_ms = drop_error_cooldown_sec * 1000;
}
}
}
@ -833,7 +855,7 @@ void DatabaseCatalog::dropTableDataTask()
/// Do not schedule a task if there is no tables to drop
if (need_reschedule)
(*drop_task)->scheduleAfter(reschedule_time_ms);
(*drop_task)->scheduleAfter(schedule_after_ms);
}
void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)

View File

@ -234,6 +234,7 @@ private:
void dropTableFinally(const TableMarkedAsDropped & table);
static constexpr size_t reschedule_time_ms = 100;
static constexpr time_t drop_error_cooldown_sec = 5;
private:
using UUIDToDatabaseMap = std::unordered_map<UUID, DatabasePtr>;

View File

@ -6,6 +6,7 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/Macros.h>
#include <Common/randomSeed.h>
#include <Core/Defines.h>
#include <Core/Settings.h>
@ -362,7 +363,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.type)
{
const auto & final_column_name = col_decl.name;
const auto tmp_column_name = final_column_name + "_tmp";
const auto tmp_column_name = final_column_name + "_tmp_alter" + toString(randomSeed());
const auto * data_type_ptr = column_names_and_types.back().type.get();
default_expr_list->children.emplace_back(

View File

@ -5,8 +5,10 @@
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadBufferFromString.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQueryParameter.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/addTypeConversionToAST.h>
@ -25,6 +27,8 @@ void ReplaceQueryParameterVisitor::visit(ASTPtr & ast)
{
if (ast->as<ASTQueryParameter>())
visitQueryParameter(ast);
else if (ast->as<ASTIdentifier>())
visitIdentifier(ast);
else
visitChildren(ast);
}
@ -71,4 +75,27 @@ void ReplaceQueryParameterVisitor::visitQueryParameter(ASTPtr & ast)
ast->setAlias(alias);
}
void ReplaceQueryParameterVisitor::visitIdentifier(ASTPtr & ast)
{
auto & ast_identifier = ast->as<ASTIdentifier &>();
if (ast_identifier.children.empty())
return;
auto & name_parts = ast_identifier.name_parts;
for (size_t i = 0, j = 0, size = name_parts.size(); i < size; ++i)
{
if (name_parts[i].empty())
{
const auto & ast_param = ast_identifier.children[j++]->as<ASTQueryParameter &>();
name_parts[i] = getParamValue(ast_param.name);
}
}
if (!ast_identifier.semantic->special && name_parts.size() >= 2)
ast_identifier.semantic->table = ast_identifier.name_parts.end()[-2];
ast_identifier.resetFullName();
ast_identifier.children.clear();
}
}

View File

@ -9,6 +9,7 @@ namespace DB
class ASTQueryParameter;
/// Visit substitutions in a query, replace ASTQueryParameter with ASTLiteral.
/// Rebuild ASTIdentifiers if some parts are ASTQueryParameter.
class ReplaceQueryParameterVisitor
{
public:
@ -21,6 +22,7 @@ public:
private:
const NameToNameMap & query_parameters;
const String & getParamValue(const String & name);
void visitIdentifier(ASTPtr & ast);
void visitQueryParameter(ASTPtr & ast);
void visitChildren(ASTPtr & ast);
};

View File

@ -242,8 +242,15 @@ void ThreadStatus::finalizePerformanceCounters()
{
const auto & settings = query_context->getSettingsRef();
if (settings.log_queries && settings.log_query_threads)
if (auto thread_log = global_context->getQueryThreadLog())
logToQueryThreadLog(*thread_log, query_context->getCurrentDatabase());
{
const auto now = std::chrono::system_clock::now();
Int64 query_duration_ms = (time_in_microseconds(now) - query_start_time_microseconds) / 1000;
if (query_duration_ms >= settings.log_queries_min_query_duration_ms.totalMilliseconds())
{
if (auto thread_log = global_context->getQueryThreadLog())
logToQueryThreadLog(*thread_log, query_context->getCurrentDatabase(), now);
}
}
}
}
catch (...)
@ -322,15 +329,14 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
#endif
}
void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database)
void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point<std::chrono::system_clock> now)
{
QueryThreadLogElement elem;
// construct current_time and current_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto now = std::chrono::system_clock::now();
auto current_time = time_in_seconds(now);
auto current_time_microseconds = time_in_microseconds(now);
auto current_time = time_in_seconds(now);
auto current_time_microseconds = time_in_microseconds(now);
elem.event_time = current_time;
elem.event_time_microseconds = current_time_microseconds;

View File

@ -17,6 +17,7 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTShowProcesslistQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -241,7 +242,7 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
if (settings.log_queries && elem.type >= settings.log_queries_min_type)
if (settings.log_queries && elem.type >= settings.log_queries_min_type && !settings.log_queries_min_query_duration_ms.totalMilliseconds())
if (auto query_log = context.getQueryLog())
query_log->add(elem);
@ -337,6 +338,27 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// TODO Parser should fail early when max_query_size limit is reached.
ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);
/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),
/// to allow settings to take effect.
if (const auto * select_query = ast->as<ASTSelectQuery>())
{
if (auto new_settings = select_query->settings())
InterpreterSetQuery(new_settings, context).executeForCurrentContext();
}
else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
{
if (!select_with_union_query->list_of_selects->children.empty())
{
if (auto new_settings = select_with_union_query->list_of_selects->children.back()->as<ASTSelectQuery>()->settings())
InterpreterSetQuery(new_settings, context).executeForCurrentContext();
}
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
if (query_with_output->settings_ast)
InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
}
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query && insert_query->settings_ast)
@ -461,9 +483,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
QueryPipeline & pipeline = res.pipeline;
bool use_processors = pipeline.initialized();
if (res.pipeline.initialized())
use_processors = true;
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
@ -552,7 +571,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (settings.log_query_settings)
elem.query_settings = std::make_shared<Settings>(context.getSettingsRef());
if (elem.type >= settings.log_queries_min_type)
if (elem.type >= settings.log_queries_min_type && !settings.log_queries_min_query_duration_ms.totalMilliseconds())
{
if (auto query_log = context.getQueryLog())
query_log->add(elem);
@ -588,8 +607,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
};
/// Also make possible for caller to log successful query finish and exception during execution.
auto finish_callback = [elem, &context, ast, log_queries, log_queries_min_type = settings.log_queries_min_type,
status_info_to_query_log]
auto finish_callback = [elem, &context, ast,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
status_info_to_query_log
]
(IBlockInputStream * stream_in, IBlockOutputStream * stream_out, QueryPipeline * query_pipeline) mutable
{
QueryStatus * process_list_elem = context.getProcessListElement();
@ -655,7 +678,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.thread_ids = std::move(info.thread_ids);
elem.profile_counters = std::move(info.profile_counters);
if (log_queries && elem.type >= log_queries_min_type)
if (log_queries && elem.type >= log_queries_min_type && Int64(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context.getQueryLog())
query_log->add(elem);
@ -694,8 +717,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
};
auto exception_callback = [elem, &context, ast, log_queries, log_queries_min_type = settings.log_queries_min_type, quota(quota),
status_info_to_query_log] () mutable
auto exception_callback = [elem, &context, ast,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
quota(quota), status_info_to_query_log] () mutable
{
if (quota)
quota->used(Quota::ERRORS, 1, /* check_exceeded = */ false);
@ -729,7 +755,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
logException(context, elem);
/// In case of exception we log internal queries also
if (log_queries && elem.type >= log_queries_min_type)
if (log_queries && elem.type >= log_queries_min_type && Int64(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context.getQueryLog())
query_log->add(elem);
@ -802,12 +828,12 @@ BlockIO executeQuery(
}
BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
{
BlockIO res = executeQuery(query, context, internal, stage, may_have_embedded_data);
@ -876,7 +902,6 @@ void executeQuery(
}
else if (streams.in)
{
/// FIXME: try to prettify this cast using `as<>()`
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
@ -895,9 +920,6 @@ void executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context.getDefaultFormat();
if (ast_query_with_output && ast_query_with_output->settings_ast)
InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
BlockOutputStreamPtr out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader());
/// Save previous progress callback if any. TODO Do it more conveniently.
@ -936,9 +958,6 @@ void executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context.getDefaultFormat();
if (ast_query_with_output && ast_query_with_output->settings_ast)
InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
if (!pipeline.isCompleted())
{
pipeline.addSimpleTransform([](const Block & header)

View File

@ -16,26 +16,48 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}
ASTIdentifier::ASTIdentifier(const String & short_name)
ASTIdentifier::ASTIdentifier(const String & short_name, ASTPtr && name_param)
: full_name(short_name), name_parts{short_name}, semantic(std::make_shared<IdentifierSemanticImpl>())
{
assert(!full_name.empty());
if (name_param == nullptr)
assert(!full_name.empty());
else
children.push_back(std::move(name_param));
}
ASTIdentifier::ASTIdentifier(std::vector<String> && name_parts_, bool special)
ASTIdentifier::ASTIdentifier(std::vector<String> && name_parts_, bool special, std::vector<ASTPtr> && name_params)
: name_parts(name_parts_), semantic(std::make_shared<IdentifierSemanticImpl>())
{
assert(!name_parts.empty());
for (const auto & part [[maybe_unused]] : name_parts)
assert(!part.empty());
semantic->special = special;
semantic->legacy_compound = true;
if (!name_params.empty())
{
size_t params = 0;
for (const auto & part [[maybe_unused]] : name_parts)
{
if (part.empty())
++params;
}
assert(params == name_params.size());
children = std::move(name_params);
}
else
{
for (const auto & part [[maybe_unused]] : name_parts)
assert(!part.empty());
if (!special && name_parts.size() >= 2)
semantic->table = name_parts.end()[-2];
if (!special && name_parts.size() >= 2)
semantic->table = name_parts.end()[-2];
resetFullName();
resetFullName();
}
}
ASTPtr ASTIdentifier::getParam() const
{
assert(full_name.empty() && children.size() == 1);
return children.front()->clone();
}
ASTPtr ASTIdentifier::clone() const
@ -64,13 +86,16 @@ void ASTIdentifier::setShortName(const String & new_name)
const String & ASTIdentifier::name() const
{
assert(!name_parts.empty());
assert(!full_name.empty());
if (children.empty())
{
assert(!name_parts.empty());
assert(!full_name.empty());
}
return full_name;
}
void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, FormatState &, FormatStateStacked) const
void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
auto format_element = [&](const String & elem_name)
{
@ -82,17 +107,24 @@ void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, Form
/// It could be compound but short
if (!isShort())
{
for (size_t i = 0, size = name_parts.size(); i < size; ++i)
for (size_t i = 0, j = 0, size = name_parts.size(); i < size; ++i)
{
if (i != 0)
settings.ostr << '.';
format_element(name_parts[i]);
if (name_parts[i].empty())
children[j++]->formatImpl(settings, state, frame);
else
format_element(name_parts[i]);
}
}
else
{
format_element(shortName());
const auto & name = shortName();
if (name.empty())
children.front()->formatImpl(settings, state, frame);
else
format_element(name);
}
}

View File

@ -2,6 +2,7 @@
#include <optional>
#include <Parsers/ASTQueryParameter.h>
#include <Parsers/ASTWithAlias.h>
#include <Core/UUID.h>
@ -17,15 +18,19 @@ struct StorageID;
/// Identifier (column, table or alias)
class ASTIdentifier : public ASTWithAlias
{
friend class ReplaceQueryParameterVisitor;
public:
UUID uuid = UUIDHelpers::Nil;
explicit ASTIdentifier(const String & short_name);
explicit ASTIdentifier(std::vector<String> && name_parts, bool special = false);
explicit ASTIdentifier(const String & short_name, ASTPtr && name_param = {});
explicit ASTIdentifier(std::vector<String> && name_parts, bool special = false, std::vector<ASTPtr> && name_params = {});
/** Get the text that identifies this element. */
String getID(char delim) const override { return "Identifier" + (delim + name()); }
/** Get the query param out of a non-compound identifier. */
ASTPtr getParam() const;
ASTPtr clone() const override;
void collectIdentifierNames(IdentifierNameSet & set) const override { set.insert(name()); }

View File

@ -146,7 +146,7 @@ bool ParserSubquery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
bool ParserIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected &)
bool ParserIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
/// Identifier in backquotes or in double quotes
if (pos->type == TokenType::QuotedIdentifier)
@ -172,7 +172,51 @@ bool ParserIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected &)
++pos;
return true;
}
else if (allow_query_parameter && pos->type == TokenType::OpeningCurlyBrace)
{
++pos;
if (pos->type != TokenType::BareWord)
{
expected.add(pos, "substitution name (identifier)");
return false;
}
String name(pos->begin, pos->end);
++pos;
if (pos->type != TokenType::Colon)
{
expected.add(pos, "colon between name and type");
return false;
}
++pos;
if (pos->type != TokenType::BareWord)
{
expected.add(pos, "substitution type (identifier)");
return false;
}
String type(pos->begin, pos->end);
++pos;
if (type != "Identifier")
{
expected.add(pos, "substitution type (identifier)");
return false;
}
if (pos->type != TokenType::ClosingCurlyBrace)
{
expected.add(pos, "closing curly brace");
return false;
}
++pos;
node = std::make_shared<ASTIdentifier>("", std::make_shared<ASTQueryParameter>(name, type));
return true;
}
return false;
}
@ -180,14 +224,19 @@ bool ParserIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected &)
bool ParserCompoundIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTPtr id_list;
if (!ParserList(std::make_unique<ParserIdentifier>(), std::make_unique<ParserToken>(TokenType::Dot), false)
.parse(pos, id_list, expected))
if (!ParserList(std::make_unique<ParserIdentifier>(allow_query_parameter), std::make_unique<ParserToken>(TokenType::Dot), false)
.parse(pos, id_list, expected))
return false;
std::vector<String> parts;
std::vector<ASTPtr> params;
const auto & list = id_list->as<ASTExpressionList &>();
for (const auto & child : list.children)
{
parts.emplace_back(getIdentifierName(child));
if (parts.back().empty())
params.push_back(child->as<ASTIdentifier>()->getParam());
}
ParserKeyword s_uuid("UUID");
UUID uuid = UUIDHelpers::Nil;
@ -201,7 +250,7 @@ bool ParserCompoundIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & ex
uuid = parseFromString<UUID>(ast_uuid->as<ASTLiteral>()->value.get<String>());
}
node = std::make_shared<ASTIdentifier>(std::move(parts));
node = std::make_shared<ASTIdentifier>(std::move(parts), false, std::move(params));
node->as<ASTIdentifier>()->uuid = uuid;
return true;
@ -1174,7 +1223,7 @@ bool ParserAlias::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool ParserColumnsMatcher::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword columns("COLUMNS");
ParserList columns_p(std::make_unique<ParserCompoundIdentifier>(), std::make_unique<ParserToken>(TokenType::Comma), false);
ParserList columns_p(std::make_unique<ParserCompoundIdentifier>(true), std::make_unique<ParserToken>(TokenType::Comma), false);
ParserStringLiteral regex;
if (!columns.ignore(pos, expected))
@ -1252,7 +1301,7 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e
auto parse_id = [&identifiers, &pos, &expected]
{
ASTPtr identifier;
if (!ParserIdentifier().parse(pos, identifier, expected))
if (!ParserIdentifier(true).parse(pos, identifier, expected))
return false;
identifiers.emplace_back(std::move(identifier));
@ -1338,7 +1387,7 @@ bool ParserAsterisk::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool ParserQualifiedAsterisk::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
if (!ParserCompoundIdentifier().parse(pos, node, expected))
if (!ParserCompoundIdentifier(false, true).parse(pos, node, expected))
return false;
if (pos->type != TokenType::Dot)
@ -1475,7 +1524,7 @@ bool ParserExpressionElement::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
|| ParserFunction().parse(pos, node, expected)
|| ParserQualifiedAsterisk().parse(pos, node, expected)
|| ParserAsterisk().parse(pos, node, expected)
|| ParserCompoundIdentifier().parse(pos, node, expected)
|| ParserCompoundIdentifier(false, true).parse(pos, node, expected)
|| ParserSubstitution().parse(pos, node, expected)
|| ParserMySQLGlobalVariable().parse(pos, node, expected);
}

View File

@ -39,12 +39,16 @@ protected:
/** An identifier, for example, x_yz123 or `something special`
* If allow_query_parameter_ = true, also parses substitutions in form {name:Identifier}
*/
class ParserIdentifier : public IParserBase
{
public:
ParserIdentifier(bool allow_query_parameter_ = false) : allow_query_parameter(allow_query_parameter_) {}
protected:
const char * getName() const override { return "identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
bool allow_query_parameter;
};
@ -54,12 +58,16 @@ protected:
class ParserCompoundIdentifier : public IParserBase
{
public:
ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false)
: table_name_with_optional_uuid(table_name_with_optional_uuid_) {}
ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false, bool allow_query_parameter_ = false)
: table_name_with_optional_uuid(table_name_with_optional_uuid_), allow_query_parameter(allow_query_parameter_)
{
}
protected:
const char * getName() const override { return "compound identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
bool table_name_with_optional_uuid;
bool allow_query_parameter;
};
/// Just *
@ -299,6 +307,17 @@ private:
};
/** Prepared statements.
* Parse query with parameter expression {name:type}.
*/
class ParserIdentifierOrSubstitution : public IParserBase
{
protected:
const char * getName() const override { return "identifier or substitution"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Prepared statements.
* Parse query with parameter expression {name:type}.
*/

View File

@ -1,13 +1,38 @@
#include <Parsers/ParserDataType.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserCreateQuery.h>
namespace DB
{
namespace
{
/// Wrapper to allow mixed lists of nested and normal types.
class ParserNestedTableOrExpression : public IParserBase
{
private:
const char * getName() const override { return "data type or expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
ParserNestedTable parser1;
if (parser1.parse(pos, node, expected))
return true;
ParserExpression parser2;
return parser2.parse(pos, node, expected);
}
};
}
bool ParserDataType::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserNestedTable nested;
@ -78,7 +103,7 @@ bool ParserDataType::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
++pos;
/// Parse optional parameters
ParserList args_parser(std::make_unique<ParserExpression>(), std::make_unique<ParserToken>(TokenType::Comma));
ParserList args_parser(std::make_unique<ParserNestedTableOrExpression>(), std::make_unique<ParserToken>(TokenType::Comma));
ASTPtr expr_list_args;
if (!args_parser.parse(pos, expr_list_args, expected))

View File

@ -23,7 +23,7 @@ bool ParserTableExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
if (!ParserWithOptionalAlias(std::make_unique<ParserSubquery>(), true).parse(pos, res->subquery, expected)
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(), true).parse(pos, res->table_function, expected)
&& !ParserWithOptionalAlias(std::make_unique<ParserCompoundIdentifier>(), true).parse(pos, res->database_and_table_name, expected))
&& !ParserWithOptionalAlias(std::make_unique<ParserCompoundIdentifier>(false, true), true).parse(pos, res->database_and_table_name, expected))
return false;
/// FINAL

View File

@ -22,12 +22,11 @@
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/AlterCommands.h>
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
#include <Common/randomSeed.h>
namespace DB
@ -1117,7 +1116,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
data_type_ptr = command.data_type;
const auto & final_column_name = column_name;
const auto tmp_column_name = final_column_name + "_tmp";
const auto tmp_column_name = final_column_name + "_tmp_alter" + toString(randomSeed());
default_expr_list->children.emplace_back(setAlias(
addTypeConversionToAST(std::make_shared<ASTIdentifier>(tmp_column_name), data_type_ptr->getName()),
@ -1133,7 +1132,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
continue;
const auto & final_column_name = column_name;
const auto tmp_column_name = final_column_name + "_tmp";
const auto tmp_column_name = final_column_name + "_tmp_alter" + toString(randomSeed());
const auto data_type_ptr = command.data_type;
default_expr_list->children.emplace_back(setAlias(

View File

@ -449,6 +449,10 @@ public:
/// We do not use mutex because it is not very important that the size could change during the operation.
virtual void checkPartitionCanBeDropped(const ASTPtr & /*partition*/) {}
/// Returns true if Storage may store some data on disk.
/// NOTE: may not be equivalent to !getDataPaths().empty()
virtual bool storesDataOnDisk() const { return false; }
/// Returns data paths if storage supports it, empty vector otherwise.
virtual Strings getDataPaths() const { return {}; }

View File

@ -617,6 +617,7 @@ public:
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
std::optional<String> getFullRelativePathForPart(const String & part_name, const String & additional_path = "") const;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override;
using PathsWithDisks = std::vector<PathWithDisk>;

View File

@ -1237,144 +1237,200 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
Pipe pipe;
{
Pipes pipes;
for (const auto & part : parts)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges,
use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
pipes.emplace_back(std::move(source_processor));
}
pipe = Pipe::unitePipes(std::move(pipes));
}
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([&metadata_snapshot](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
auto get_merging_processor = [&]() -> MergingTransformPtr
{
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
}
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
__builtin_unreachable();
};
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
if (num_streams <= 1 || sort_description.empty())
/// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions.
/// We have all parts in parts vector, where parts with same partition are nerby.
/// So we will store iterators pointed to the beginning of each partition range (and parts.end()),
/// then we will create a pipe for each partition that will run selecting processor and merging processor
/// for the parts with this partition. In the end we will unite all the pipes.
std::vector<RangesInDataParts::iterator> parts_to_merge_ranges;
auto it = parts.begin();
parts_to_merge_ranges.push_back(it);
if (settings.do_not_merge_across_partitions_select_final)
{
pipe.addTransform(get_merging_processor());
return pipe;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingSelectorTransform>(stream_header, num_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports)
{
Processors processors;
std::vector<OutputPorts::iterator> output_ports;
processors.reserve(ports.size() + num_streams);
output_ports.reserve(ports.size());
for (auto & port : ports)
while (it != parts.end())
{
auto copier = std::make_shared<CopyTransform>(header, num_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
processors.emplace_back(std::move(copier));
it = std::find_if(
it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; });
parts_to_merge_ranges.push_back(it);
}
/// We divide threads for each partition equally. But we will create at least the number of partitions threads.
/// (So, the total number of threads could be more than initial num_streams.
num_streams /= (parts_to_merge_ranges.size() - 1);
}
else
{
/// If do_not_merge_across_partitions_select_final is false we just merge all the parts.
parts_to_merge_ranges.push_back(parts.end());
}
Pipes partition_pipes;
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
Pipe pipe;
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
Pipes pipes;
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part_it->data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part_it->part_index_in_query);
pipes.emplace_back(std::move(source_processor));
}
processors.emplace_back(std::move(merge));
pipe = Pipe::unitePipes(std::move(pipes));
}
return processors;
});
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
return pipe;
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part
if (settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
partition_pipes.emplace_back(std::move(pipe));
continue;
}
pipe.addSimpleTransform([&metadata_snapshot](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
auto get_merging_processor = [&]() -> MergingTransformPtr
{
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, pipe.numOutputPorts(), sort_description, max_block_size);
}
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(
header,
pipe.numOutputPorts(),
sort_description,
data.merging_params.columns_to_sum,
partition_key_columns,
max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipe.numOutputPorts(), sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
__builtin_unreachable();
};
if (num_streams <= 1 || sort_description.empty())
{
pipe.addTransform(get_merging_processor());
partition_pipes.emplace_back(std::move(pipe));
continue;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingSelectorTransform>(stream_header, num_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports)
{
Processors processors;
std::vector<OutputPorts::iterator> output_ports;
processors.reserve(ports.size() + num_streams);
output_ports.reserve(ports.size());
for (auto & port : ports)
{
auto copier = std::make_shared<CopyTransform>(header, num_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
processors.emplace_back(std::move(copier));
}
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
}
processors.emplace_back(std::move(merge));
}
return processors;
});
partition_pipes.emplace_back(std::move(pipe));
}
return Pipe::unitePipes(std::move(partition_pipes));
}
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.

View File

@ -112,7 +112,6 @@ struct Settings;
/** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \
M(UInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \
M(index_granularity)

View File

@ -82,6 +82,7 @@ public:
void shutdown() override;
void drop() override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override;
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }

View File

@ -524,6 +524,11 @@ BlockOutputStreamPtr StorageFile::write(
chooseCompressionMethod(path, compression_method), context);
}
bool StorageFile::storesDataOnDisk() const
{
return is_db_table;
}
Strings StorageFile::getDataPaths() const
{
if (paths.empty())

View File

@ -46,6 +46,7 @@ public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
bool storesDataOnDisk() const override;
Strings getDataPaths() const override;
struct CommonArguments

View File

@ -41,6 +41,7 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
protected:

View File

@ -144,6 +144,7 @@ public:
CheckResults checkData(const ASTPtr & query , const Context & context) override { return getNested()->checkData(query, context); }
void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); }
void checkPartitionCanBeDropped(const ASTPtr & partition) override { getNested()->checkPartitionCanBeDropped(partition); }
bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); }
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }
std::optional<UInt64> totalRows() const override { return getNested()->totalRows(); }

View File

@ -24,6 +24,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {path}; }
protected:

View File

@ -40,6 +40,7 @@ public:
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder&) override;

View File

@ -39,6 +39,7 @@ public:
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;

View File

@ -0,0 +1,4 @@
<yandex>
<!-- this update period also syncs MemoryTracking with RSS, disable this, by using period = 1 day -->
<asynchronous_metrics_update_period_s>86400</asynchronous_metrics_update_period_s>
</yandex>

View File

@ -8,7 +8,10 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/conf.xml'])
instance = cluster.add_instance('instance', main_configs=[
'configs/conf.xml',
'configs/asynchronous_metrics_update_period_s.xml',
])
@pytest.fixture(scope='module', autouse=True)

View File

@ -0,0 +1,23 @@
<test>
<settings>
<do_not_merge_across_partitions_select_final>1</do_not_merge_across_partitions_select_final>
</settings>
<create_query>
CREATE TABLE optimized_select_final (t DateTime, x Int32)
ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(t) ORDER BY x
</create_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2000-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2020-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2021-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2022-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>OPTIMIZE TABLE optimized_select_final</fill_query>
<query>SELECT * FROM optimized_select_final FINAL FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS optimized_select_final</drop_query>
</test>

View File

@ -1,7 +1,7 @@
CREATE TABLE test
(
`a1` UInt64 DEFAULT a + 1,
`a1` UInt64 DEFAULT a + 1,
`a0` UInt64 DEFAULT a1 + 1,
`a1` UInt64 DEFAULT a0 + 1,
`a2` UInt64 DEFAULT a3 + a4,
`a3` UInt64 DEFAULT a2 + 1,
`a4` UInt64 ALIAS a3 + 1

View File

@ -3,6 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP DATABASE IF EXISTS test_01114_1"
$CLICKHOUSE_CLIENT -q "DROP DATABASE IF EXISTS test_01114_2"
$CLICKHOUSE_CLIENT -q "DROP DATABASE IF EXISTS test_01114_3"

View File

@ -49,4 +49,4 @@ $CLICKHOUSE_CLIENT -q "SELECT if(quantile(0.5)(query_duration_ms) < $max_time_ms
$CLICKHOUSE_CLIENT -q "SELECT count() * $count_multiplier, i, d, s, n.i, n.f FROM $db.table_merge GROUP BY i, d, s, n.i, n.f ORDER BY i"
$CLICKHOUSE_CLIENT -q "DROP DATABASE $db" --database_atomic_wait_for_drop_and_detach_synchronously=0
$CLICKHOUSE_CLIENT -q "DROP DATABASE $db"

View File

@ -1,4 +1,5 @@
DROP TABLE IF EXISTS test_01307;
CREATE TABLE test_01307 (id UInt64, val String, INDEX ind val TYPE bloom_filter() GRANULARITY 1) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 2;
INSERT INTO test_01307 (id, val) select number as id, toString(number) as val from numbers(4);
SELECT count() FROM test_01307 WHERE identity(val) = '2';
@ -6,3 +7,5 @@ SELECT count() FROM test_01307 WHERE val = '2';
OPTIMIZE TABLE test_01307 FINAL;
SELECT count() FROM test_01307 WHERE identity(val) = '2';
SELECT count() FROM test_01307 WHERE val = '2';
DROP TABLE test_01307;

View File

@ -0,0 +1,6 @@
2000-01-01 00:00:00 0
2020-01-01 00:00:00 0
2000-01-01 00:00:00 1
2020-01-01 00:00:00 1
2000-01-01 00:00:00 2
2020-01-01 00:00:00 2

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS select_final;
CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x;
INSERT INTO select_final SELECT toDate('2000-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1 FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2);
SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1;
DROP TABLE select_final;

View File

@ -0,0 +1,5 @@
a Tuple(key String, value String)
a Tuple(Tuple(key String, value String))
a.key Array(String)
a.value Array(String)
a Tuple(UInt8, Tuple(key String, value String))

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS test_01532_1;
DROP TABLE IF EXISTS test_01532_2;
DROP TABLE IF EXISTS test_01532_3;
DROP TABLE IF EXISTS test_01532_4;
CREATE TABLE test_01532_1 (a Tuple(key String, value String)) ENGINE Memory();
DESCRIBE TABLE test_01532_1;
CREATE TABLE test_01532_2 (a Tuple(Tuple(key String, value String))) ENGINE Memory();
DESCRIBE TABLE test_01532_2;
CREATE TABLE test_01532_3 (a Array(Tuple(key String, value String))) ENGINE Memory();
DESCRIBE TABLE test_01532_3;
CREATE TABLE test_01532_4 (a Tuple(UInt8, Tuple(key String, value String))) ENGINE Memory();
DESCRIBE TABLE test_01532_4;
DROP TABLE test_01532_1;
DROP TABLE test_01532_2;
DROP TABLE test_01532_3;
DROP TABLE test_01532_4;

View File

@ -0,0 +1,4 @@
0
0
1
1

View File

@ -0,0 +1,54 @@
set log_queries_min_query_duration_ms=300000;
set log_query_threads=1;
set log_queries=1;
--
-- fast -- no logging
--
select '01546_log_queries_min_query_duration_ms-fast' format Null;
system flush logs;
-- No logging, since the query is fast enough.
select count()
from system.query_log
where
query like '%01546_log_queries_min_query_duration_ms-fast%'
and query not like '%system.query_log%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
select count()
from system.query_thread_log
where
query like '%01546_log_queries_min_query_duration_ms-fast%'
and query not like '%system.query_thread_log%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
--
-- slow -- query logged
--
set log_queries_min_query_duration_ms=300;
select '01546_log_queries_min_query_duration_ms-slow', sleep(0.4) format Null;
system flush logs;
-- With the limit on minimum execution time, "query start" and "exception before start" events are not logged, only query finish.
select count()
from system.query_log
where
query like '%01546_log_queries_min_query_duration_ms-slow%'
and query not like '%system.query_log%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
-- There at least two threads involved in a simple query
-- (one thread just waits another, sigh)
select count() == 2
from system.query_thread_log
where
query like '%01546_log_queries_min_query_duration_ms-slow%'
and query not like '%system.query_thread_log%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;

View File

@ -0,0 +1,148 @@
DROP TABLE IF EXISTS fct_rt_dc_shop_sku_vender_day;
create table fct_rt_dc_shop_sku_vender_day
(
stat_year UInt16,
stat_month UInt32,
stat_day Date,
out_buid UInt8,
out_shop_id String,
in_shop_id LowCardinality(String),
datasource UInt8,
venderid String,
categorytreeid UInt8,
categoryid String,
goodsid LowCardinality(String),
logistics UInt8,
buntype UInt8,
dctype UInt8,
shopformid UInt8,
rt_qty Decimal(18,4),
rt_cost Decimal(18,4),
rt_taxcost Decimal(18,4),
rt_boxes Decimal(18,4),
rt_shops Nullable(String),
rt_drygood_qty Decimal(18,4),
rt_drygood_cost Decimal(18,4),
rt_drygood_boxes Decimal(18,4),
rt_drygood_shops LowCardinality(Nullable(String)),
rt_fresh_qty Decimal(18,4),
rt_fresh_cost Decimal(18,4),
rt_fresh_shops LowCardinality(Nullable(String)),
rt_supshop_cost Decimal(18,4),
rt_supshop_qty Decimal(18,4),
rt_supshop_boxes Decimal(18,4),
rt_supshop_shops LowCardinality(Nullable(String)),
rt_smallshop_cost Decimal(18,4),
rt_smallshop_qty Decimal(18,4),
rt_smallshop_boxes Decimal(18,4),
rt_smallshop_shops LowCardinality(Nullable(String)),
rt_dc_cost Decimal(18,4),
rt_dc_qty Decimal(18,4),
rt_dc_boxes Decimal(18,4),
rt_dc_shops LowCardinality(Nullable(String)),
rt_drygood_supshop_cost Decimal(18,4),
rt_drygood_supshop_qty Decimal(18,4),
rt_drygood_supshop_boxes Decimal(18,4),
rt_drygood_supshop_shops LowCardinality(Nullable(String)),
rt_drygood_smallshop_cost Decimal(18,4),
rt_drygood_smallshop_qty Decimal(18,4),
rt_drygood_smallshop_boxes Decimal(18,4),
rt_drygood_smallshop_shops LowCardinality(Nullable(String)),
rt_drygood_dc_cost Decimal(18,4),
rt_drygood_dc_qty Decimal(18,4),
rt_drygood_dc_boxes Decimal(18,4),
rt_drygood_dc_shops LowCardinality(Nullable(String)),
rt_fresh_supshop_cost Decimal(18,4),
rt_fresh_supshop_qty Decimal(18,4),
rt_fresh_supshop_shops LowCardinality(Nullable(String)),
rt_fresh_smallshop_cost Decimal(18,4),
rt_fresh_smallshop_qty Decimal(18,4),
rt_fresh_smallshop_shops LowCardinality(Nullable(String)),
rt_fresh_dc_cost Decimal(18,4),
rt_fresh_dc_qty Decimal(18,4),
rt_fresh_dc_shops LowCardinality(Nullable(String)),
stat_day_num String default formatDateTime(stat_day, '%F')
)
engine = MergeTree PARTITION BY toYYYYMM(stat_day) ORDER BY (stat_day, out_shop_id) SETTINGS index_granularity = 8192
;
select stat_year,
stat_month,
out_buid,
out_shop_id,
in_shop_id,
datasource,
venderid,
categorytreeid,
categoryid,
goodsid,
logistics,
buntype,
dctype,
shopformid,
sum(rt_qty),
sum(rt_cost),
sum(rt_taxcost),
sum(rt_boxes),
max(rt_shops),
sum(rt_drygood_qty),
sum(rt_drygood_cost),
sum(rt_drygood_boxes),
max(rt_drygood_shops),
sum(rt_fresh_qty),
sum(rt_fresh_cost),
max(rt_fresh_shops),
sum(rt_supshop_cost),
sum(rt_supshop_qty),
sum(rt_supshop_boxes),
max(rt_supshop_shops),
sum(rt_smallshop_cost),
sum(rt_smallshop_qty),
sum(rt_smallshop_boxes),
max(rt_smallshop_shops),
sum(rt_dc_cost),
sum(rt_dc_qty),
sum(rt_dc_boxes),
max(rt_dc_shops),
sum(rt_drygood_supshop_cost),
sum(rt_drygood_supshop_qty),
sum(rt_drygood_supshop_boxes),
max(rt_drygood_supshop_shops),
sum(rt_drygood_smallshop_cost),
sum(rt_drygood_smallshop_qty),
sum(rt_drygood_smallshop_boxes),
max(rt_drygood_smallshop_shops),
sum(rt_drygood_dc_cost),
sum(rt_drygood_dc_qty),
sum(rt_drygood_dc_boxes),
max(rt_drygood_dc_shops),
sum(rt_fresh_supshop_cost),
sum(rt_fresh_supshop_qty),
max(rt_fresh_supshop_shops),
sum(rt_fresh_smallshop_cost),
sum(rt_fresh_smallshop_qty),
max(rt_fresh_smallshop_shops),
sum(rt_fresh_dc_cost),
sum(rt_fresh_dc_qty),
max(rt_fresh_dc_shops)
from fct_rt_dc_shop_sku_vender_day frdssvd
where stat_day >= toDate('2016-01-01')
and stat_day < addMonths(toDate('2016-01-01'), 1)
group by stat_year,
stat_month,
out_buid,
out_shop_id,
in_shop_id,
datasource,
venderid,
categorytreeid,
categoryid,
goodsid,
logistics,
buntype,
dctype,
shopformid;
DROP TABLE fct_rt_dc_shop_sku_vender_day;

View File

@ -0,0 +1,3 @@
5754696928334414137 test
HASH_VAL UInt64
STR_VAL LowCardinality(String)

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS HASH_MV;
DROP TABLE IF EXISTS HASH_TEST_INSERT;
CREATE TABLE HASH_TEST_INSERT (STR_VAL String) ENGINE = Null;
CREATE MATERIALIZED VIEW HASH_MV (HASH_VAL UInt64, STR_VAL LowCardinality(String)) ENGINE = ReplacingMergeTree ORDER BY HASH_VAL AS SELECT xxHash64(STR_VAL) AS HASH_VAL, toLowCardinality(STR_VAL) AS STR_VAL FROM HASH_TEST_INSERT;
INSERT INTO HASH_TEST_INSERT VALUES ('test');
SELECT * FROM HASH_MV;
DESC (SELECT * FROM HASH_MV);
DROP TABLE HASH_MV;
DROP TABLE HASH_TEST_INSERT;

View File

@ -0,0 +1,2 @@
1 2
---

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t(`id` String, `dealer_id` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 8192;
insert into t(id, dealer_id) values('1','2');
SELECT * FROM t;
SET mutations_sync = 1;
ALTER TABLE t DELETE WHERE id in (select id from t as tmp);
SELECT '---';
SELECT * FROM t;
DROP TABLE t;

View File

@ -0,0 +1,5 @@
0
0
0
0
45

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --param_tbl 'numbers' --query 'select * from system.{tbl:Identifier} limit 1'
$CLICKHOUSE_CLIENT --param_db 'system' --param_tbl 'numbers' --query 'select * from {db:Identifier}.{tbl:Identifier} limit 1'
$CLICKHOUSE_CLIENT --param_col 'number' --query 'select {col:Identifier} from system.numbers limit 1'
$CLICKHOUSE_CLIENT --param_col 'number' --query 'select a.{col:Identifier} from system.numbers a limit 1'
$CLICKHOUSE_CLIENT --param_tbl 'numbers' --param_col 'number' --query 'select sum({tbl:Identifier}.{col:Identifier}) FROM (select * from system.{tbl:Identifier} limit 10) numbers'

View File

@ -0,0 +1,10 @@
DROP TABLE IF EXISTS f;
DROP TABLE IF EXISTS v;
create table f(s String) engine File(TSV, '/dev/null');
create view v as (select * from f);
select * from v; -- was failing long time ago
select * from merge('', 'f'); -- was failing long time ago
DROP TABLE f;
DROP TABLE v;

View File

@ -0,0 +1,3 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test(test String DEFAULT 'test', test_tmp Int DEFAULT 1)ENGINE = Memory;
DROP TABLE test;

View File

@ -0,0 +1,2 @@
Hello\0World

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS src;
CREATE TABLE src (k UInt64, s FixedString(11)) ENGINE = Memory;
INSERT INTO src VALUES (1, 'Hello\0World');
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict
(
k UInt64,
s String
)
PRIMARY KEY k
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER default TABLE 'src'))
LAYOUT(FLAT)
LIFETIME(MIN 10 MAX 10);
SELECT dictGet(currentDatabase() || '.dict', 's', number) FROM numbers(2);
DROP TABLE src;
DROP DICTIONARY dict;

View File

@ -0,0 +1,43 @@
number
0
number
1
0
1
2
3
4
5
6
7
8
9
{
"meta":
[
{
"name": "number",
"type": "UInt64"
}
],
"data":
[
["0"],
["1"],
["2"],
["3"],
["4"],
["5"],
["6"],
["7"],
["8"],
["9"]
],
"rows": 10,
"rows_before_limit_at_least": 10
}

View File

@ -0,0 +1,8 @@
select * from numbers(100) settings max_result_rows = 1; -- { serverError 396 }
select * from numbers(100) FORMAT JSON settings max_result_rows = 1; -- { serverError 396 }
SET max_result_rows = 1;
select * from numbers(10); -- { serverError 396 }
select * from numbers(10) SETTINGS result_overflow_mode = 'break', max_block_size = 1 FORMAT PrettySpaceNoEscapes;
select * from numbers(10) settings max_result_rows = 10;
select * from numbers(10) FORMAT JSONCompact settings max_result_rows = 10, output_format_write_statistics = 0;

View File

@ -155,5 +155,7 @@
01509_dictionary_preallocate
01526_max_untracked_memory
01530_drop_database_atomic_sync
01546_log_queries_min_query_duration_ms
01547_query_log_current_database
01548_query_log_query_execution_ms
01552_dict_fixedstring

View File

@ -262,7 +262,7 @@ def process_unknown_commits(commits, commits_info, users):
# Returns False if the PR should not be mentioned changelog.
def parse_one_pull_request(item):
description = item['description']
lines = [line for line in [x.strip() for x in description.split('\n') if description else []] if line]
lines = [line for line in [x.strip() for x in description.split('\n')] if line] if description else []
lines = [re.sub(r'\s+', ' ', l) for l in lines]
cat_pos = None