Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into support-apple-m1

This commit is contained in:
changvvb 2021-04-09 23:19:19 +08:00
commit facbb0368b
107 changed files with 1294 additions and 574 deletions

View File

@ -11,7 +11,7 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/NuRaft/CMakeLists.txt")
return()
endif ()
if (NOT OS_FREEBSD AND NOT OS_DARWIN)
if (NOT OS_FREEBSD)
set (USE_NURAFT 1)
set (NURAFT_LIBRARY nuraft)

View File

@ -124,12 +124,14 @@ target_include_directories(jemalloc SYSTEM PRIVATE
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_NO_PRIVATE_NAMESPACE)
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1 -DJEMALLOC_PROF=1)
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1)
endif ()
if (USE_UNWIND)
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
target_link_libraries (jemalloc PRIVATE unwind)
endif ()
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_PROF=1)
if (USE_UNWIND)
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
target_link_libraries (jemalloc PRIVATE unwind)
endif ()
target_compile_options(jemalloc PRIVATE -Wno-redundant-decls)

@ -1 +1 @@
Subproject commit f4476ee7311b35b593750f6ae2cbdb62a4006374
Subproject commit 5f4034a3a6376416504f17186c55fe401c6d8e5e

View File

@ -198,7 +198,7 @@ case "$stage" in
# Lost connection to the server. This probably means that the server died
# with abort.
echo "failure" > status.txt
if ! grep -ao "Received signal.*\|Logical error.*\|Assertion.*failed\|Failed assertion.*\|.*runtime error: .*\|.*is located.*\|SUMMARY: MemorySanitizer:.*\|SUMMARY: ThreadSanitizer:.*\|.*_LIBCPP_ASSERT.*" server.log > description.txt
if ! grep -ao "Received signal.*\|Logical error.*\|Assertion.*failed\|Failed assertion.*\|.*runtime error: .*\|.*is located.*\|SUMMARY: AddressSanitizer:.*\|SUMMARY: MemorySanitizer:.*\|SUMMARY: ThreadSanitizer:.*\|.*_LIBCPP_ASSERT.*" server.log > description.txt
then
echo "Lost connection to server. See the logs." > description.txt
fi

View File

@ -108,6 +108,11 @@ zgrep -Fav "ASan doesn't fully support makecontext/swapcontext functions" > /dev
|| echo -e 'No sanitizer asserts\tOK' >> /test_output/test_results.tsv
rm -f /test_output/tmp
# OOM
zgrep -Fa " <Fatal> Application: Child process was terminated by signal 9" /var/log/clickhouse-server/clickhouse-server.log > /dev/null \
&& echo -e 'OOM killer (or signal 9) in clickhouse-server.log\tFAIL' >> /test_output/test_results.tsv \
|| echo -e 'No OOM messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
# Logical errors
zgrep -Fa "Code: 49, e.displayText() = DB::Exception:" /var/log/clickhouse-server/clickhouse-server.log > /dev/null \
&& echo -e 'Logical error thrown (see clickhouse-server.log)\tFAIL' >> /test_output/test_results.tsv \
@ -118,7 +123,7 @@ zgrep -Fa "########################################" /var/log/clickhouse-server/
&& echo -e 'Killed by signal (in clickhouse-server.log)\tFAIL' >> /test_output/test_results.tsv \
|| echo -e 'Not crashed\tOK' >> /test_output/test_results.tsv
# It also checks for OOM or crash without stacktrace (printed by watchdog)
# It also checks for crash without stacktrace (printed by watchdog)
zgrep -Fa " <Fatal> " /var/log/clickhouse-server/clickhouse-server.log > /dev/null \
&& echo -e 'Fatal message in clickhouse-server.log\tFAIL' >> /test_output/test_results.tsv \
|| echo -e 'No fatal messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv

File diff suppressed because one or more lines are too long

View File

@ -48,7 +48,8 @@ toc_title: Adopters
| <a href="https://www.diva-e.com" class="favicon">Diva-e</a> | Digital consulting | Main Product | — | — | [Slides in English, September 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup29/ClickHouse-MeetUp-Unusual-Applications-sd-2019-09-17.pdf) |
| <a href="https://www.ecwid.com/" class="favicon">Ecwid</a> | E-commerce SaaS | Metrics, Logging | — | — | [Slides in Russian, April 2019](https://nastachku.ru/var/files/1/presentation/backend/2_Backend_6.pdf) |
| <a href="https://www.ebay.com/" class="favicon">eBay</a> | E-commerce | Logs, Metrics and Events | — | — | [Official website, Sep 2020](https://tech.ebayinc.com/engineering/ou-online-analytical-processing/) |
| <a href="https://www.exness.com" class="favicon">Exness</a> | Trading | Metrics, Logging | — | — | [Talk in Russian, May 2019](https://youtu.be/_rpU-TvSfZ8?t=3215) |
| <a href="https://www.exness.com/" class="favicon">Exness</a> | Trading | Metrics, Logging | — | — | [Talk in Russian, May 2019](https://youtu.be/_rpU-TvSfZ8?t=3215) |
| <a href="https://www.eventbunker.io/" class="favicon">EventBunker.io</a> | Serverless Data Processing | — | — | — | [Tweet, April 2021](https://twitter.com/Halil_D_/status/1379839133472985091) |
| <a href="https://fastnetmon.com/" class="favicon">FastNetMon</a> | DDoS Protection | Main Product | | — | [Official website](https://fastnetmon.com/docs-fnm-advanced/fastnetmon-advanced-traffic-persistency/) |
| <a href="https://www.flipkart.com/" class="favicon">Flipkart</a> | e-Commerce | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=239) |
| <a href="https://fun.co/rp" class="favicon">FunCorp</a> | Games | | — | 14 bn records/day as of Jan 2021 | [Article](https://www.altinity.com/blog/migrating-from-redshift-to-clickhouse) |

View File

@ -233,7 +233,7 @@ JAVA_OPTS="-Xms{{ '{{' }} cluster.get('xms','128M') {{ '}}' }} \
-XX:+PrintSafepointStatistics \
-XX:+UseParNewGC \
-XX:+UseConcMarkSweepGC \
-XX:+CMSParallelRemarkEnabled"
-XX:+CMSParallelRemarkEnabled"
```
Salt init:

View File

@ -649,3 +649,65 @@ Result:
- [List of XML and HTML character entity references](https://en.wikipedia.org/wiki/List_of_XML_and_HTML_character_entity_references)
## extractTextFromHTML {#extracttextfromhtml}
A function to extract text from HTML or XHTML.
It does not necessarily 100% conform to any of the HTML, XML or XHTML standards, but the implementation is reasonably accurate and it is fast. The rules are the following:
1. Comments are skipped. Example: `<!-- test -->`. Comment must end with `-->`. Nested comments are not possible.
Note: constructions like `<!-->` and `<!--->` are not valid comments in HTML but they are skipped by other rules.
2. CDATA is pasted verbatim. Note: CDATA is XML/XHTML specific. But it is processed for "best-effort" approach.
3. `script` and `style` elements are removed with all their content. Note: it is assumed that closing tag cannot appear inside content. For example, in JS string literal has to be escaped like `"<\/script>"`.
Note: comments and CDATA are possible inside `script` or `style` - then closing tags are not searched inside CDATA. Example: `<script><![CDATA[</script>]]></script>`. But they are still searched inside comments. Sometimes it becomes complicated: `<script>var x = "<!--"; </script> var y = "-->"; alert(x + y);</script>`
Note: `script` and `style` can be the names of XML namespaces - then they are not treated like usual `script` or `style` elements. Example: `<script:a>Hello</script:a>`.
Note: whitespaces are possible after closing tag name: `</script >` but not before: `< / script>`.
4. Other tags or tag-like elements are skipped without inner content. Example: `<a>.</a>`
Note: it is expected that this HTML is illegal: `<a test=">"></a>`
Note: it also skips something like tags: `<>`, `<!>`, etc.
Note: tag without end is skipped to the end of input: `<hello `
5. HTML and XML entities are not decoded. They must be processed by separate function.
6. Whitespaces in the text are collapsed or inserted by specific rules.
- Whitespaces at the beginning and at the end are removed.
- Consecutive whitespaces are collapsed.
- But if the text is separated by other elements and there is no whitespace, it is inserted.
- It may cause unnatural examples: `Hello<b>world</b>`, `Hello<!-- -->world` - there is no whitespace in HTML, but the function inserts it. Also consider: `Hello<p>world</p>`, `Hello<br>world`. This behavior is reasonable for data analysis, e.g. to convert HTML to a bag of words.
7. Also note that correct handling of whitespaces requires the support of `<pre></pre>` and CSS `display` and `white-space` properties.
**Syntax**
``` sql
extractTextFromHTML(x)
```
**Arguments**
- `x` — input text. [String](../../sql-reference/data-types/string.md).
**Returned value**
- Extracted text.
Type: [String](../../sql-reference/data-types/string.md).
**Example**
The first example contains several tags and a comment and also shows whitespace processing.
The second example shows `CDATA` and `script` tag processing.
In the third example text is extracted from the full HTML response received by the [url](../../sql-reference/table-functions/url.md) function.
Query:
``` sql
SELECT extractTextFromHTML(' <p> A text <i>with</i><b>tags</b>. <!-- comments --> </p> ');
SELECT extractTextFromHTML('<![CDATA[The content within <b>CDATA</b>]]> <script>alert("Script");</script>');
SELECT extractTextFromHTML(html) FROM url('http://www.donothingfor2minutes.com/', RawBLOB, 'html String');
```
Result:
``` text
A text with tags .
The content within <b>CDATA</b>
Do Nothing for 2 Minutes 2:00 &nbsp;
```

View File

@ -79,7 +79,7 @@ The `TTL` is no longer there, so the second row is not deleted:
└───────────────────────┴─────────┴──────────────┘
```
### See Also
**See Also**
- More about the [TTL-expression](../../../sql-reference/statements/create/table.md#ttl-expression).
- Modify column [with TTL](../../../sql-reference/statements/alter/column.md#alter_modify-column).

File diff suppressed because one or more lines are too long

View File

@ -16,4 +16,5 @@ toc_title: "Введение"
- [AMPLab Big Data Benchmark](amplab-benchmark.md)
- [Данные о такси в Нью-Йорке](nyc-taxi.md)
- [OnTime](ontime.md)
- [Вышки сотовой связи](../../getting-started/example-datasets/cell-towers.md)

View File

@ -645,3 +645,66 @@ SELECT decodeXMLComponent('&lt; &#x3A3; &gt;');
- [Мнемоники в HTML](https://ru.wikipedia.org/wiki/%D0%9C%D0%BD%D0%B5%D0%BC%D0%BE%D0%BD%D0%B8%D0%BA%D0%B8_%D0%B2_HTML)
## extractTextFromHTML {#extracttextfromhtml}
Функция для извлечения текста из HTML или XHTML.
Она не соответствует всем HTML, XML или XHTML стандартам на 100%, но ее реализация достаточно точная и быстрая. Правила обработки следующие:
1. Комментарии удаляются. Пример: `<!-- test -->`. Комментарий должен оканчиваться символами `-->`. Вложенные комментарии недопустимы.
Примечание: конструкции наподобие `<!-->` и `<!--->` не являются допустимыми комментариями в HTML, но они будут удалены согласно другим правилам.
2. Содержимое CDATA вставляется дословно. Примечание: формат CDATA специфичен для XML/XHTML. Но он обрабатывается всегда по принципу "наилучшего возможного результата".
3. Элементы `script` и `style` удаляются вместе со всем содержимым. Примечание: предполагается, что закрывающий тег не может появиться внутри содержимого. Например, в JS строковый литерал должен быть экранирован как `"<\/script>"`.
Примечание: комментарии и CDATA возможны внутри `script` или `style` - тогда закрывающие теги не ищутся внутри CDATA. Пример: `<script><![CDATA[</script>]]></script>`. Но они ищутся внутри комментариев. Иногда возникают сложные случаи: `<script>var x = "<!--"; </script> var y = "-->"; alert(x + y);</script>`
Примечание: `script` и `style` могут быть названиями пространств имен XML - тогда они не обрабатываются как обычные элементы `script` или `style`. Пример: `<script:a>Hello</script:a>`.
Примечание: пробелы возможны после имени закрывающего тега: `</script >`, но не перед ним: `< / script>`.
4. Другие теги или элементы, подобные тегам, удаляются, а их внутреннее содержимое остается. Пример: `<a>.</a>`
Примечание: ожидается, что такой HTML является недопустимым: `<a test=">"></a>`
Примечание: функция также удаляет подобные тегам элементы: `<>`, `<!>`, и т. д.
Примечание: если встречается тег без завершающего символа `>`, то удаляется этот тег и весь следующий за ним текст: `<hello `
5. Мнемоники HTML и XML не декодируются. Они должны быть обработаны отдельной функцией.
6. Пробелы в тексте удаляются и добавляются по следующим правилам:
- Пробелы в начале и в конце извлеченного текста удаляются.
- Несколько пробелов подряд заменяются одним пробелом.
- Если текст разделен другими удаляемыми элементами и в этом месте нет пробела, он добавляется.
- Это может привести к появлению неестественного написания, например: `Hello<b>world</b>`, `Hello<!-- -->world` — в HTML нет пробелов, но функция вставляет их. Также следует учитывать такие варианты написания: `Hello<p>world</p>`, `Hello<br>world`. Подобные результаты выполнения функции могут использоваться для анализа данных, например, для преобразования HTML-текста в набор используемых слов.
7. Также обратите внимание, что правильная обработка пробелов требует поддержки `<pre></pre>` и свойств CSS `display` и `white-space`.
**Синтаксис**
``` sql
extractTextFromHTML(x)
```
**Аргументы**
- `x` — текст для обработки. [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- Извлеченный текст.
Тип: [String](../../sql-reference/data-types/string.md).
**Пример**
Первый пример содержит несколько тегов и комментарий. На этом примере также видно, как обрабатываются пробелы.
Второй пример показывает обработку `CDATA` и тега `script`.
В третьем примере текст выделяется из полного HTML ответа, полученного с помощью функции [url](../../sql-reference/table-functions/url.md).
Запрос:
``` sql
SELECT extractTextFromHTML(' <p> A text <i>with</i><b>tags</b>. <!-- comments --> </p> ');
SELECT extractTextFromHTML('<![CDATA[The content within <b>CDATA</b>]]> <script>alert("Script");</script>');
SELECT extractTextFromHTML(html) FROM url('http://www.donothingfor2minutes.com/', RawBLOB, 'html String');
```
Результат:
``` text
A text with tags .
The content within <b>CDATA</b>
Do Nothing for 2 Minutes 2:00 &nbsp;
```

View File

@ -82,4 +82,4 @@ SELECT * FROM table_with_ttl;
### Смотрите также
- Подробнее о [свойстве TTL](../../../engines/table-engines/mergetree-family/mergetree.md#mergetree-column-ttl).
- Изменить столбец [с TTL](../../../sql-reference/statements/alter/column.md#alter_modify-column).

View File

@ -174,6 +174,8 @@ public:
return "mannWhitneyUTest";
}
bool allocatesMemoryInArena() const override { return true; }
DataTypePtr getReturnType() const override
{
DataTypes types
@ -208,7 +210,7 @@ public:
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & a = this->data(place);
auto & b = this->data(rhs);
const auto & b = this->data(rhs);
a.merge(b, arena);
}

View File

@ -58,6 +58,8 @@ public:
return "rankCorr";
}
bool allocatesMemoryInArena() const override { return true; }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeNumber<Float64>>();

View File

@ -128,7 +128,7 @@ private:
template <size_t> friend class AlignedArenaAllocator;
public:
Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024)
explicit Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024)
: growth_factor(growth_factor_), linear_growth_threshold(linear_growth_threshold_),
head(new MemoryChunk(initial_size_, nullptr)), size_in_bytes(head->size()),
page_size(static_cast<size_t>(::getPageSize()))
@ -160,7 +160,7 @@ public:
void * head_pos = head->pos;
size_t space = head->end - head->pos;
auto res = static_cast<char *>(std::align(alignment, size, head_pos, space));
auto * res = static_cast<char *>(std::align(alignment, size, head_pos, space));
if (res)
{
head->pos = static_cast<char *>(head_pos);

View File

@ -548,6 +548,7 @@
M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \
M(579, INCORRECT_PART_TYPE) \
M(580, CANNOT_SET_ROUNDING_MODE) \
M(581, TOO_LARGE_DISTRIBUTED_DEPTH) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -44,8 +44,8 @@ ChangelogFileDescription getChangelogFileDescription(const std::string & path_st
ChangelogFileDescription result;
result.prefix = filename_parts[0];
result.from_log_index = parse<size_t>(filename_parts[1]);
result.to_log_index = parse<size_t>(filename_parts[2]);
result.from_log_index = parse<uint64_t>(filename_parts[1]);
result.to_log_index = parse<uint64_t>(filename_parts[2]);
result.path = path_str;
return result;
}
@ -73,7 +73,7 @@ Checksum computeRecordChecksum(const ChangelogRecord & record)
class ChangelogWriter
{
public:
ChangelogWriter(const std::string & filepath_, WriteMode mode, size_t start_index_)
ChangelogWriter(const std::string & filepath_, WriteMode mode, uint64_t start_index_)
: filepath(filepath_)
, plain_buf(filepath, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY))
, start_index(start_index_)
@ -115,22 +115,22 @@ public:
plain_buf.sync();
}
size_t getEntriesWritten() const
uint64_t getEntriesWritten() const
{
return entries_written;
}
void setEntriesWritten(size_t entries_written_)
void setEntriesWritten(uint64_t entries_written_)
{
entries_written = entries_written_;
}
size_t getStartIndex() const
uint64_t getStartIndex() const
{
return start_index;
}
void setStartIndex(size_t start_index_)
void setStartIndex(uint64_t start_index_)
{
start_index = start_index_;
}
@ -138,14 +138,14 @@ public:
private:
std::string filepath;
WriteBufferFromFile plain_buf;
size_t entries_written = 0;
size_t start_index;
uint64_t entries_written = 0;
uint64_t start_index;
};
struct ChangelogReadResult
{
size_t entries_read;
size_t first_read_index;
uint64_t entries_read;
uint64_t first_read_index;
off_t last_position;
bool error;
};
@ -158,9 +158,9 @@ public:
, read_buf(filepath)
{}
ChangelogReadResult readChangelog(IndexToLogEntry & logs, size_t start_log_index, IndexToOffset & index_to_offset, Poco::Logger * log)
ChangelogReadResult readChangelog(IndexToLogEntry & logs, uint64_t start_log_index, IndexToOffset & index_to_offset, Poco::Logger * log)
{
size_t previous_index = 0;
uint64_t previous_index = 0;
ChangelogReadResult result{};
try
{
@ -247,7 +247,7 @@ private:
ReadBufferFromFile read_buf;
};
Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval_, Poco::Logger * log_)
Changelog::Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_)
: changelogs_dir(changelogs_dir_)
, rotate_interval(rotate_interval_)
, log(log_)
@ -263,15 +263,15 @@ Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval
}
}
void Changelog::readChangelogAndInitWriter(size_t last_commited_log_index, size_t logs_to_keep)
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
{
size_t total_read = 0;
size_t entries_in_last = 0;
size_t incomplete_log_index = 0;
uint64_t total_read = 0;
uint64_t entries_in_last = 0;
uint64_t incomplete_log_index = 0;
ChangelogReadResult result{};
size_t first_read_index = 0;
uint64_t first_read_index = 0;
size_t start_to_read_from = last_commited_log_index;
uint64_t start_to_read_from = last_commited_log_index;
if (start_to_read_from > logs_to_keep)
start_to_read_from -= logs_to_keep;
else
@ -355,7 +355,7 @@ void Changelog::readChangelogAndInitWriter(size_t last_commited_log_index, size_
rotate(start_index + total_read);
}
void Changelog::rotate(size_t new_start_log_index)
void Changelog::rotate(uint64_t new_start_log_index)
{
ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX;
@ -369,7 +369,7 @@ void Changelog::rotate(size_t new_start_log_index)
current_writer = std::make_unique<ChangelogWriter>(new_description.path, WriteMode::Rewrite, new_start_log_index);
}
ChangelogRecord Changelog::buildRecord(size_t index, const LogEntryPtr & log_entry)
ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_entry)
{
ChangelogRecord record;
record.header.version = ChangelogVersion::V0;
@ -387,7 +387,7 @@ ChangelogRecord Changelog::buildRecord(size_t index, const LogEntryPtr & log_ent
return record;
}
void Changelog::appendEntry(size_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
{
if (!current_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records");
@ -405,7 +405,7 @@ void Changelog::appendEntry(size_t index, const LogEntryPtr & log_entry, bool fo
logs[index] = makeClone(log_entry);
}
void Changelog::writeAt(size_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
{
if (index_to_start_pos.count(index) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index);
@ -439,7 +439,7 @@ void Changelog::writeAt(size_t index, const LogEntryPtr & log_entry, bool force_
}
/// Remove redundant logs from memory
for (size_t i = index; ; ++i)
for (uint64_t i = index; ; ++i)
{
auto log_itr = logs.find(i);
if (log_itr == logs.end())
@ -454,7 +454,7 @@ void Changelog::writeAt(size_t index, const LogEntryPtr & log_entry, bool force_
appendEntry(index, log_entry, force_sync);
}
void Changelog::compact(size_t up_to_log_index)
void Changelog::compact(uint64_t up_to_log_index)
{
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
@ -476,9 +476,9 @@ void Changelog::compact(size_t up_to_log_index)
LogEntryPtr Changelog::getLastEntry() const
{
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(size_t)));
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(uint64_t)));
size_t next_index = getNextEntryIndex() - 1;
uint64_t next_index = getNextEntryIndex() - 1;
auto entry = logs.find(next_index);
if (entry == logs.end())
return fake_entry;
@ -486,13 +486,13 @@ LogEntryPtr Changelog::getLastEntry() const
return entry->second;
}
LogEntriesPtr Changelog::getLogEntriesBetween(size_t start, size_t end)
LogEntriesPtr Changelog::getLogEntriesBetween(uint64_t start, uint64_t end)
{
LogEntriesPtr ret = nuraft::cs_new<std::vector<nuraft::ptr<nuraft::log_entry>>>();
ret->resize(end - start);
size_t result_pos = 0;
for (size_t i = start; i < end; ++i)
uint64_t result_pos = 0;
for (uint64_t i = start; i < end; ++i)
{
(*ret)[result_pos] = entryAt(i);
result_pos++;
@ -500,7 +500,7 @@ LogEntriesPtr Changelog::getLogEntriesBetween(size_t start, size_t end)
return ret;
}
LogEntryPtr Changelog::entryAt(size_t index)
LogEntryPtr Changelog::entryAt(uint64_t index)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
auto entry = logs.find(index);
@ -511,12 +511,12 @@ LogEntryPtr Changelog::entryAt(size_t index)
return src;
}
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(size_t index, int32_t count)
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;
size_t size_total = 0;
for (size_t i = index; i < index + count; ++i)
uint64_t size_total = 0;
for (uint64_t i = index; i < index + count; ++i)
{
auto entry = logs.find(i);
if (entry == logs.end())
@ -540,14 +540,14 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(size_t index, in
return buf_out;
}
void Changelog::applyEntriesFromBuffer(size_t index, nuraft::buffer & buffer, bool force_sync)
void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync)
{
buffer.pos(0);
int num_logs = buffer.get_int();
for (int i = 0; i < num_logs; ++i)
{
size_t cur_index = index + i;
uint64_t cur_index = index + i;
int buf_size = buffer.get_int();
nuraft::ptr<nuraft::buffer> buf_local = nuraft::buffer::alloc(buf_size);

View File

@ -17,8 +17,8 @@ using LogEntries = std::vector<LogEntryPtr>;
using LogEntriesPtr = nuraft::ptr<LogEntries>;
using BufferPtr = nuraft::ptr<nuraft::buffer>;
using IndexToOffset = std::unordered_map<size_t, off_t>;
using IndexToLogEntry = std::unordered_map<size_t, LogEntryPtr>;
using IndexToOffset = std::unordered_map<uint64_t, off_t>;
using IndexToLogEntry = std::unordered_map<uint64_t, LogEntryPtr>;
enum class ChangelogVersion : uint8_t
{
@ -30,10 +30,10 @@ static constexpr auto CURRENT_CHANGELOG_VERSION = ChangelogVersion::V0;
struct ChangelogRecordHeader
{
ChangelogVersion version = CURRENT_CHANGELOG_VERSION;
size_t index; /// entry log number
size_t term;
uint64_t index; /// entry log number
uint64_t term;
nuraft::log_val_type value_type;
size_t blob_size;
uint64_t blob_size;
};
/// Changelog record on disk
@ -48,8 +48,8 @@ struct ChangelogRecord
struct ChangelogFileDescription
{
std::string prefix;
size_t from_log_index;
size_t to_log_index;
uint64_t from_log_index;
uint64_t to_log_index;
std::string path;
};
@ -63,27 +63,27 @@ class Changelog
{
public:
Changelog(const std::string & changelogs_dir_, size_t rotate_interval_, Poco::Logger * log_);
Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_);
/// Read changelog from files on changelogs_dir_ skipping all entries before from_log_index
/// Truncate broken entries, remove files after broken entries.
void readChangelogAndInitWriter(size_t last_commited_log_index, size_t logs_to_keep);
void readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep);
/// Add entry to log with index. Call fsync if force_sync true.
void appendEntry(size_t index, const LogEntryPtr & log_entry, bool force_sync);
void appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Write entry at index and truncate all subsequent entries.
void writeAt(size_t index, const LogEntryPtr & log_entry, bool force_sync);
void writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Remove log files with to_log_index <= up_to_log_index.
void compact(size_t up_to_log_index);
void compact(uint64_t up_to_log_index);
size_t getNextEntryIndex() const
uint64_t getNextEntryIndex() const
{
return start_index + logs.size();
}
size_t getStartIndex() const
uint64_t getStartIndex() const
{
return start_index;
}
@ -92,21 +92,21 @@ public:
LogEntryPtr getLastEntry() const;
/// Return log entries between [start, end)
LogEntriesPtr getLogEntriesBetween(size_t start_index, size_t end_index);
LogEntriesPtr getLogEntriesBetween(uint64_t start_index, uint64_t end_index);
/// Return entry at position index
LogEntryPtr entryAt(size_t index);
LogEntryPtr entryAt(uint64_t index);
/// Serialize entries from index into buffer
BufferPtr serializeEntriesToBuffer(size_t index, int32_t count);
BufferPtr serializeEntriesToBuffer(uint64_t index, int32_t count);
/// Apply entries from buffer overriding existing entries
void applyEntriesFromBuffer(size_t index, nuraft::buffer & buffer, bool force_sync);
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync);
/// Fsync log to disk
void flush();
size_t size() const
uint64_t size() const
{
return logs.size();
}
@ -116,21 +116,21 @@ public:
private:
/// Pack log_entry into changelog record
static ChangelogRecord buildRecord(size_t index, const LogEntryPtr & log_entry);
static ChangelogRecord buildRecord(uint64_t index, const LogEntryPtr & log_entry);
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(size_t new_start_log_index);
void rotate(uint64_t new_start_log_index);
private:
const std::string changelogs_dir;
const size_t rotate_interval;
const uint64_t rotate_interval;
Poco::Logger * log;
std::map<size_t, ChangelogFileDescription> existing_changelogs;
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;
std::unique_ptr<ChangelogWriter> current_writer;
IndexToOffset index_to_start_pos;
IndexToLogEntry logs;
size_t start_index = 0;
uint64_t start_index = 0;
};
}

View File

@ -16,16 +16,16 @@ ptr<log_entry> makeClone(const ptr<log_entry> & entry)
InMemoryLogStore::InMemoryLogStore()
: start_idx(1)
{
nuraft::ptr<nuraft::buffer> buf = nuraft::buffer::alloc(sizeof(size_t));
nuraft::ptr<nuraft::buffer> buf = nuraft::buffer::alloc(sizeof(uint64_t));
logs[0] = nuraft::cs_new<nuraft::log_entry>(0, buf);
}
size_t InMemoryLogStore::start_index() const
uint64_t InMemoryLogStore::start_index() const
{
return start_idx;
}
size_t InMemoryLogStore::next_slot() const
uint64_t InMemoryLogStore::next_slot() const
{
std::lock_guard<std::mutex> l(logs_lock);
// Exclude the dummy entry.
@ -34,7 +34,7 @@ size_t InMemoryLogStore::next_slot() const
nuraft::ptr<nuraft::log_entry> InMemoryLogStore::last_entry() const
{
size_t next_idx = next_slot();
uint64_t next_idx = next_slot();
std::lock_guard<std::mutex> lock(logs_lock);
auto entry = logs.find(next_idx - 1);
if (entry == logs.end())
@ -43,17 +43,17 @@ nuraft::ptr<nuraft::log_entry> InMemoryLogStore::last_entry() const
return makeClone(entry->second);
}
size_t InMemoryLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
uint64_t InMemoryLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
ptr<log_entry> clone = makeClone(entry);
std::lock_guard<std::mutex> l(logs_lock);
size_t idx = start_idx + logs.size() - 1;
uint64_t idx = start_idx + logs.size() - 1;
logs[idx] = clone;
return idx;
}
void InMemoryLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry)
void InMemoryLogStore::write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
nuraft::ptr<log_entry> clone = makeClone(entry);
@ -65,14 +65,14 @@ void InMemoryLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & e
logs[index] = clone;
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> InMemoryLogStore::log_entries(size_t start, size_t end)
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> InMemoryLogStore::log_entries(uint64_t start, uint64_t end)
{
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> ret =
nuraft::cs_new<std::vector<nuraft::ptr<nuraft::log_entry>>>();
ret->resize(end - start);
size_t cc = 0;
for (size_t i = start; i < end; ++i)
uint64_t cc = 0;
for (uint64_t i = start; i < end; ++i)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
{
@ -90,7 +90,7 @@ nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> InMemoryLogStore::log_e
return ret;
}
nuraft::ptr<nuraft::log_entry> InMemoryLogStore::entry_at(size_t index)
nuraft::ptr<nuraft::log_entry> InMemoryLogStore::entry_at(uint64_t index)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
{
@ -103,9 +103,9 @@ nuraft::ptr<nuraft::log_entry> InMemoryLogStore::entry_at(size_t index)
return makeClone(src);
}
size_t InMemoryLogStore::term_at(size_t index)
uint64_t InMemoryLogStore::term_at(uint64_t index)
{
size_t term = 0;
uint64_t term = 0;
{
std::lock_guard<std::mutex> l(logs_lock);
auto entry = logs.find(index);
@ -116,12 +116,12 @@ size_t InMemoryLogStore::term_at(size_t index)
return term;
}
nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(size_t index, Int32 cnt)
nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(uint64_t index, Int32 cnt)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;
size_t size_total = 0;
for (size_t ii = index; ii < index + cnt; ++ii)
uint64_t uint64_total = 0;
for (uint64_t ii = index; ii < index + cnt; ++ii)
{
ptr<log_entry> le = nullptr;
{
@ -130,11 +130,11 @@ nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(size_t index, Int32 cnt)
}
assert(le.get());
nuraft::ptr<nuraft::buffer> buf = le->serialize();
size_total += buf->size();
uint64_total += buf->size();
returned_logs.push_back(buf);
}
nuraft::ptr<buffer> buf_out = nuraft::buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
nuraft::ptr<buffer> buf_out = nuraft::buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + uint64_total);
buf_out->pos(0);
buf_out->put(static_cast<Int32>(cnt));
@ -147,14 +147,14 @@ nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(size_t index, Int32 cnt)
return buf_out;
}
void InMemoryLogStore::apply_pack(size_t index, nuraft::buffer & pack)
void InMemoryLogStore::apply_pack(uint64_t index, nuraft::buffer & pack)
{
pack.pos(0);
Int32 num_logs = pack.get_int();
for (Int32 i = 0; i < num_logs; ++i)
{
size_t cur_idx = index + i;
uint64_t cur_idx = index + i;
Int32 buf_size = pack.get_int();
nuraft::ptr<nuraft::buffer> buf_local = nuraft::buffer::alloc(buf_size);
@ -177,10 +177,10 @@ void InMemoryLogStore::apply_pack(size_t index, nuraft::buffer & pack)
}
}
bool InMemoryLogStore::compact(size_t last_log_index)
bool InMemoryLogStore::compact(uint64_t last_log_index)
{
std::lock_guard<std::mutex> l(logs_lock);
for (size_t ii = start_idx; ii <= last_log_index; ++ii)
for (uint64_t ii = start_idx; ii <= last_log_index; ++ii)
{
auto entry = logs.find(ii);
if (entry != logs.end())

View File

@ -14,34 +14,34 @@ class InMemoryLogStore : public nuraft::log_store
public:
InMemoryLogStore();
size_t start_index() const override;
uint64_t start_index() const override;
size_t next_slot() const override;
uint64_t next_slot() const override;
nuraft::ptr<nuraft::log_entry> last_entry() const override;
size_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
uint64_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(size_t start, size_t end) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;
nuraft::ptr<nuraft::log_entry> entry_at(size_t index) override;
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;
size_t term_at(size_t index) override;
uint64_t term_at(uint64_t index) override;
nuraft::ptr<nuraft::buffer> pack(size_t index, Int32 cnt) override;
nuraft::ptr<nuraft::buffer> pack(uint64_t index, Int32 cnt) override;
void apply_pack(size_t index, nuraft::buffer & pack) override;
void apply_pack(uint64_t index, nuraft::buffer & pack) override;
bool compact(size_t last_log_index) override;
bool compact(uint64_t last_log_index) override;
bool flush() override { return true; }
private:
std::map<size_t, nuraft::ptr<nuraft::log_entry>> logs;
std::map<uint64_t, nuraft::ptr<nuraft::log_entry>> logs;
mutable std::mutex logs_lock;
std::atomic<size_t> start_idx;
std::atomic<uint64_t> start_idx;
};
}

View File

@ -3,26 +3,26 @@
namespace DB
{
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_)
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_)
: log(&Poco::Logger::get("KeeperLogStore"))
, changelog(changelogs_path, rotate_interval_, log)
, force_sync(force_sync_)
{
}
size_t KeeperLogStore::start_index() const
uint64_t KeeperLogStore::start_index() const
{
std::lock_guard lock(changelog_lock);
return changelog.getStartIndex();
}
void KeeperLogStore::init(size_t last_commited_log_index, size_t logs_to_keep)
void KeeperLogStore::init(uint64_t last_commited_log_index, uint64_t logs_to_keep)
{
std::lock_guard lock(changelog_lock);
changelog.readChangelogAndInitWriter(last_commited_log_index, logs_to_keep);
}
size_t KeeperLogStore::next_slot() const
uint64_t KeeperLogStore::next_slot() const
{
std::lock_guard lock(changelog_lock);
return changelog.getNextEntryIndex();
@ -34,34 +34,34 @@ nuraft::ptr<nuraft::log_entry> KeeperLogStore::last_entry() const
return changelog.getLastEntry();
}
size_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
uint64_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
size_t idx = changelog.getNextEntryIndex();
uint64_t idx = changelog.getNextEntryIndex();
changelog.appendEntry(idx, entry, force_sync);
return idx;
}
void KeeperLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry)
void KeeperLogStore::write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
changelog.writeAt(index, entry, force_sync);
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> KeeperLogStore::log_entries(size_t start, size_t end)
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> KeeperLogStore::log_entries(uint64_t start, uint64_t end)
{
std::lock_guard lock(changelog_lock);
return changelog.getLogEntriesBetween(start, end);
}
nuraft::ptr<nuraft::log_entry> KeeperLogStore::entry_at(size_t index)
nuraft::ptr<nuraft::log_entry> KeeperLogStore::entry_at(uint64_t index)
{
std::lock_guard lock(changelog_lock);
return changelog.entryAt(index);
}
size_t KeeperLogStore::term_at(size_t index)
uint64_t KeeperLogStore::term_at(uint64_t index)
{
std::lock_guard lock(changelog_lock);
auto entry = changelog.entryAt(index);
@ -70,13 +70,13 @@ size_t KeeperLogStore::term_at(size_t index)
return 0;
}
nuraft::ptr<nuraft::buffer> KeeperLogStore::pack(size_t index, int32_t cnt)
nuraft::ptr<nuraft::buffer> KeeperLogStore::pack(uint64_t index, int32_t cnt)
{
std::lock_guard lock(changelog_lock);
return changelog.serializeEntriesToBuffer(index, cnt);
}
bool KeeperLogStore::compact(size_t last_log_index)
bool KeeperLogStore::compact(uint64_t last_log_index)
{
std::lock_guard lock(changelog_lock);
changelog.compact(last_log_index);
@ -90,13 +90,13 @@ bool KeeperLogStore::flush()
return true;
}
void KeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack)
void KeeperLogStore::apply_pack(uint64_t index, nuraft::buffer & pack)
{
std::lock_guard lock(changelog_lock);
changelog.applyEntriesFromBuffer(index, pack, force_sync);
}
size_t KeeperLogStore::size() const
uint64_t KeeperLogStore::size() const
{
std::lock_guard lock(changelog_lock);
return changelog.size();

View File

@ -12,35 +12,35 @@ namespace DB
class KeeperLogStore : public nuraft::log_store
{
public:
KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_);
KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_);
void init(size_t last_commited_log_index, size_t logs_to_keep);
void init(uint64_t last_commited_log_index, uint64_t logs_to_keep);
size_t start_index() const override;
uint64_t start_index() const override;
size_t next_slot() const override;
uint64_t next_slot() const override;
nuraft::ptr<nuraft::log_entry> last_entry() const override;
size_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
uint64_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(size_t start, size_t end) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;
nuraft::ptr<nuraft::log_entry> entry_at(size_t index) override;
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;
size_t term_at(size_t index) override;
uint64_t term_at(uint64_t index) override;
nuraft::ptr<nuraft::buffer> pack(size_t index, int32_t cnt) override;
nuraft::ptr<nuraft::buffer> pack(uint64_t index, int32_t cnt) override;
void apply_pack(size_t index, nuraft::buffer & pack) override;
void apply_pack(uint64_t index, nuraft::buffer & pack) override;
bool compact(size_t last_log_index) override;
bool compact(uint64_t last_log_index) override;
bool flush() override;
size_t size() const;
uint64_t size() const;
private:
mutable std::mutex changelog_lock;

View File

@ -23,16 +23,16 @@ namespace ErrorCodes
namespace
{
size_t getSnapshotPathUpToLogIdx(const String & snapshot_path)
uint64_t getSnapshotPathUpToLogIdx(const String & snapshot_path)
{
std::filesystem::path path(snapshot_path);
std::string filename = path.stem();
Strings name_parts;
splitInto<'_'>(name_parts, filename);
return parse<size_t>(name_parts[1]);
return parse<uint64_t>(name_parts[1]);
}
std::string getSnapshotFileName(size_t up_to_log_idx)
std::string getSnapshotFileName(uint64_t up_to_log_idx)
{
return std::string{"snapshot_"} + std::to_string(up_to_log_idx) + ".bin";
}
@ -214,7 +214,7 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
return result;
}
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_)
: storage(storage_)
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, session_id(storage->session_id_counter)
@ -266,7 +266,7 @@ KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_
}
std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx)
std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
{
ReadBufferFromNuraftBuffer reader(buffer);
@ -307,7 +307,7 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
return nullptr;
}
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const
{
const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx);
WriteBufferFromNuraftBuffer writer;
@ -352,7 +352,7 @@ void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
removeSnapshot(existing_snapshots.begin()->first);
}
void KeeperSnapshotManager::removeSnapshot(size_t log_idx)
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
{
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())

View File

@ -18,7 +18,7 @@ enum SnapshotVersion : uint8_t
struct KeeperStorageSnapshot
{
public:
KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_);
KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_);
KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_);
~KeeperStorageSnapshot();
@ -51,14 +51,14 @@ public:
SnapshotMetaAndStorage restoreFromLatestSnapshot();
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const;
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const;
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk();
void removeSnapshot(size_t log_idx);
void removeSnapshot(uint64_t log_idx);
size_t totalSnapshots() const
{
@ -76,7 +76,7 @@ private:
void removeOutdatedSnapshotsIfNeeded();
const std::string snapshots_path;
const size_t snapshots_to_keep;
std::map<size_t, std::string> existing_snapshots;
std::map<uint64_t, std::string> existing_snapshots;
size_t storage_tick_time;
};

View File

@ -54,7 +54,7 @@ void KeeperStateMachine::init()
bool has_snapshots = snapshot_manager.totalSnapshots() != 0;
while (snapshot_manager.totalSnapshots() != 0)
{
size_t latest_log_index = snapshot_manager.getLatestSnapshotIndex();
uint64_t latest_log_index = snapshot_manager.getLatestSnapshotIndex();
LOG_DEBUG(log, "Trying to load state machine from snapshot up to log index {}", latest_log_index);
try
@ -88,7 +88,7 @@ void KeeperStateMachine::init()
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
if (data.size() == sizeof(int64_t))
{
@ -205,7 +205,7 @@ void KeeperStateMachine::create_snapshot(
void KeeperStateMachine::save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool /*is_first_obj*/,
bool /*is_last_obj*/)
@ -246,7 +246,7 @@ void KeeperStateMachine::save_logical_snp_obj(
int KeeperStateMachine::read_logical_snp_obj(
nuraft::snapshot & s,
void* & /*user_snp_ctx*/,
ulong obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj)
{

View File

@ -20,13 +20,13 @@ public:
void init();
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> pre_commit(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> commit(const size_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override;
void rollback(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
size_t last_commit_index() override { return last_committed_idx; }
uint64_t last_commit_index() override { return last_committed_idx; }
bool apply_snapshot(nuraft::snapshot & s) override;
@ -38,7 +38,7 @@ public:
void save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool is_first_obj,
bool is_last_obj) override;
@ -46,7 +46,7 @@ public:
int read_logical_snp_obj(
nuraft::snapshot & s,
void* & user_snp_ctx,
ulong obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;
@ -82,7 +82,7 @@ private:
std::mutex storage_lock;
/// Last committed Raft log number.
std::atomic<size_t> last_committed_idx;
std::atomic<uint64_t> last_committed_idx;
Poco::Logger * log;
};

View File

@ -64,7 +64,7 @@ KeeperStateManager::KeeperStateManager(
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
}
void KeeperStateManager::loadLogStore(size_t last_commited_index, size_t logs_to_keep)
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
{
log_store->init(last_commited_index, logs_to_keep);
}

View File

@ -25,7 +25,7 @@ public:
int port,
const std::string & logs_path);
void loadLogStore(size_t last_commited_index, size_t logs_to_keep);
void loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep);
void flushLogStore();
@ -54,12 +54,12 @@ public:
nuraft::ptr<KeeperLogStore> getLogStore() const { return log_store; }
size_t getTotalServers() const { return total_servers; }
uint64_t getTotalServers() const { return total_servers; }
private:
int my_server_id;
int my_port;
size_t total_servers{0};
uint64_t total_servers{0};
std::unordered_set<int> start_as_follower_servers;
nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_config> my_server_config;

View File

@ -21,7 +21,7 @@ SummingStateMachine::SummingStateMachine()
{
}
nuraft::ptr<nuraft::buffer> SummingStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
nuraft::ptr<nuraft::buffer> SummingStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
int64_t value_to_add = deserializeValue(data);
@ -84,7 +84,7 @@ void SummingStateMachine::createSnapshotInternal(nuraft::snapshot & s)
void SummingStateMachine::save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool /*is_first_obj*/,
bool /*is_last_obj*/)
@ -112,7 +112,7 @@ void SummingStateMachine::save_logical_snp_obj(
int SummingStateMachine::read_logical_snp_obj(
nuraft::snapshot & s,
void* & /*user_snp_ctx*/,
size_t obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj)
{
@ -142,7 +142,7 @@ int SummingStateMachine::read_logical_snp_obj(
else
{
// Object ID > 0: second object, put actual value.
data_out = nuraft::buffer::alloc(sizeof(size_t));
data_out = nuraft::buffer::alloc(sizeof(uint64_t));
nuraft::buffer_serializer bs(data_out);
bs.put_u64(ctx->value);
is_last_obj = true;

View File

@ -15,13 +15,13 @@ class SummingStateMachine : public nuraft::state_machine
public:
SummingStateMachine();
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> pre_commit(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> commit(const size_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override;
void rollback(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
size_t last_commit_index() override { return last_committed_idx; }
uint64_t last_commit_index() override { return last_committed_idx; }
bool apply_snapshot(nuraft::snapshot & s) override;
@ -33,7 +33,7 @@ public:
void save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool is_first_obj,
bool is_last_obj) override;
@ -41,7 +41,7 @@ public:
int read_logical_snp_obj(
nuraft::snapshot & s,
void* & user_snp_ctx,
size_t obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;

View File

@ -1085,7 +1085,7 @@ nuraft::ptr<nuraft::log_entry> getLogEntryFromZKRequest(size_t term, int64_t ses
return nuraft::cs_new<nuraft::log_entry>(term, buffer);
}
void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size_t total_logs)
void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint64_t total_logs)
{
using namespace Coordination;
using namespace DB;

View File

@ -81,8 +81,9 @@
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54447
#define DBMS_TCP_PROTOCOL_VERSION 54448
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -441,6 +441,7 @@ class IColumn;
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
@ -454,7 +455,11 @@ class IColumn;
M(UnionMode, union_default_mode, UnionMode::Unspecified, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
\
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
M(Bool, query_plan_filter_push_down, true, "Allow to push down filter by predicate query plan step", 0) \
\
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
// End of COMMON_SETTINGS

View File

@ -567,7 +567,11 @@ void HashedDictionary<dictionary_key_type, sparse>::calculateBytesAllocated()
if constexpr (sparse || std::is_same_v<AttributeValueType, Field>)
{
bytes_allocated += container.max_size() * (sizeof(KeyType) + sizeof(AttributeValueType));
/// bucket_count() - Returns table size, that includes empty and deleted
/// size() - Returns table size, w/o empty and deleted
/// and since this is sparsehash, empty cells should not be significant,
/// and since items cannot be removed from the dictionary, deleted is also not important.
bytes_allocated += container.size() * (sizeof(KeyType) + sizeof(AttributeValueType));
bucket_count = container.bucket_count();
}
else

View File

@ -1349,7 +1349,7 @@ ColumnsWithTypeAndName prepareFunctionArguments(const ActionsDAG::NodeRawConstPt
/// Create actions which calculate conjunction of selected nodes.
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND).
///
/// Result actions add single column with conjunction result (it is always last in index).
/// Result actions add single column with conjunction result (it is always first in index).
/// No other columns are added or removed.
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs)
{
@ -1414,6 +1414,20 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
}
}
const Node * result_predicate = nodes_mapping[*conjunction.begin()];
if (conjunction.size() > 1)
{
NodeRawConstPtrs args;
args.reserve(conjunction.size());
for (const auto * predicate : conjunction)
args.emplace_back(nodes_mapping[predicate]);
result_predicate = &actions->addFunction(func_builder_and, std::move(args), {});
}
actions->index.push_back(result_predicate);
for (const auto & col : all_inputs)
{
const Node * input;
@ -1430,19 +1444,6 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
actions->index.push_back(input);
}
const Node * result_predicate = nodes_mapping[*conjunction.begin()];
if (conjunction.size() > 1)
{
NodeRawConstPtrs args;
args.reserve(conjunction.size());
for (const auto * predicate : conjunction)
args.emplace_back(nodes_mapping[predicate]);
result_predicate = &actions->addFunction(func_builder_and, std::move(args), {});
}
actions->index.push_back(result_predicate);
return actions;
}
@ -1458,6 +1459,11 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
filter_name, dumpDAG());
/// If condition is constant let's do nothing.
/// It means there is nothing to push down or optimization was already applied.
if (predicate->type == ActionType::COLUMN)
return nullptr;
std::unordered_set<const Node *> allowed_nodes;
/// Get input nodes from available_inputs names.
@ -1507,7 +1513,19 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
node.result_name = std::move(predicate->result_name);
node.result_type = std::move(predicate->result_type);
node.column = node.result_type->createColumnConst(0, 1);
*predicate = std::move(node);
if (predicate->type != ActionType::INPUT)
*predicate = std::move(node);
else
{
/// Special case. We cannot replace input to constant inplace.
/// Because we cannot affect inputs list for actions.
/// So we just add a new constant and update index.
const auto * new_predicate = &addNode(node);
for (auto & index_node : index)
if (index_node == predicate)
index_node = new_predicate;
}
}
removeUnusedActions(false);

View File

@ -220,7 +220,7 @@ public:
/// Create actions which may calculate part of filter using only available_inputs.
/// If nothing may be calculated, returns nullptr.
/// Otherwise, return actions which inputs are from available_inputs.
/// Returned actions add single column which may be used for filter.
/// Returned actions add single column which may be used for filter. Added column will be the first one.
/// Also, replace some nodes of current inputs to constant 1 in case they are filtered.
///
/// @param all_inputs should contain inputs from previous step, which will be used for result actions.
@ -231,9 +231,9 @@ public:
/// Pushed condition: z > 0
/// GROUP BY step will transform columns `x, y, z` -> `sum(x), y, z`
/// If we just add filter step with actions `z -> z > 0` before GROUP BY,
/// columns will be transformed like `x, y, z` -> `z, z > 0, x, y` -(remove filter)-> `z, x, y`.
/// columns will be transformed like `x, y, z` -> `z > 0, z, x, y` -(remove filter)-> `z, x, y`.
/// To avoid it, add inputs from `all_inputs` list,
/// so actions `x, y, z -> x, y, z, z > 0` -(remove filter)-> `x, y, z` will not change columns order.
/// so actions `x, y, z -> z > 0, x, y, z` -(remove filter)-> `x, y, z` will not change columns order.
ActionsDAGPtr cloneActionsForFilterPushDown(
const std::string & filter_name,
bool can_remove_filter,

View File

@ -60,6 +60,9 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
writeBinary(quota_key, out);
if (server_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH)
writeVarUInt(distributed_depth, out);
if (interface == Interface::TCP)
{
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
@ -137,6 +140,9 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
readBinary(quota_key, in);
if (client_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH)
readVarUInt(distributed_depth, in);
if (interface == Interface::TCP)
{
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)

View File

@ -96,6 +96,8 @@ public:
/// Common
String quota_key;
UInt64 distributed_depth = 0;
bool empty() const { return query_kind == QueryKind::NO_QUERY; }
/** Serialization and deserialization.

View File

@ -16,6 +16,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ClusterProxy
{
@ -92,6 +97,9 @@ void executeQuery(
const Settings & settings = context.getSettingsRef();
if (settings.max_distributed_depth && context.getClientInfo().distributed_depth > settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::vector<QueryPlanPtr> plans;
Pipes remote_pipes;
Pipes delayed_pipes;
@ -100,6 +108,8 @@ void executeQuery(
auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
new_context->getClientInfo().distributed_depth += 1;
ThrottlerPtr user_level_throttler;
if (auto * process_list_element = context.getProcessListElement())
user_level_throttler = process_list_element->getUserNetworkThrottler();
@ -156,8 +166,7 @@ void executeQuery(
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto header = input_streams.front().header;
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
}

View File

@ -75,7 +75,7 @@ struct TableWithColumnNamesAndTypes
void addMaterializedColumns(const NamesAndTypesList & addition)
{
addAdditionalColumns(alias_columns, addition);
addAdditionalColumns(materialized_columns, addition);
}
private:

View File

@ -152,7 +152,7 @@ void DatabaseCatalog::loadDatabases()
/// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startupIfNecessary();
TemporaryLiveViewCleaner::instance().startup();
}
void DatabaseCatalog::shutdownImpl()

View File

@ -134,6 +134,8 @@ class HashJoin : public IJoin
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
const TableJoin & getTableJoin() const override { return *table_join; }
/** Add block of data from right hand of JOIN to the map.
* Returns false, if some limit was exceeded and you should not insert more data.
*/

View File

@ -14,11 +14,15 @@ class Block;
struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
class TableJoin;
class IJoin
{
public:
virtual ~IJoin() = default;
virtual const TableJoin & getTableJoin() const = 0;
/// Add block of data from right hand of JOIN.
/// @returns false, if some limit was exceeded and you should not insert more data.
virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0;

View File

@ -37,7 +37,6 @@
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPlan/AddingDelayedSourceStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
@ -1081,26 +1080,14 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.hasJoin())
{
JoinPtr join = expressions.join;
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
expressions.join);
expressions.join,
expressions.join_has_delayed_stream,
settings.max_block_size);
join_step->setStepDescription("JOIN");
query_plan.addStep(std::move(join_step));
if (expressions.join_has_delayed_stream)
{
const Block & join_result_sample = query_plan.getCurrentDataStream().header;
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, settings.max_block_size);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
auto add_non_joined_rows_step = std::make_unique<AddingDelayedSourceStep>(
query_plan.getCurrentDataStream(), std::move(source));
add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN");
query_plan.addStep(std::move(add_non_joined_rows_step));
}
}
if (expressions.hasWhere())

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
@ -251,11 +252,23 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
if (!blocksHaveEqualStructure(plans[i]->getCurrentDataStream().header, result_header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
plans[i]->getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto converting_step = std::make_unique<ExpressionStep>(plans[i]->getCurrentDataStream(), std::move(actions_dag));
converting_step->setStepDescription("Conversion before UNION");
plans[i]->addStep(std::move(converting_step));
}
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));

View File

@ -19,6 +19,8 @@ class JoinSwitcher : public IJoin
public:
JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_);
const TableJoin & getTableJoin() const override { return *table_join; }
/// Add block of data from right hand of JOIN into current join object.
/// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it.
/// @returns false, if join-on-disk disk limit exceeded

View File

@ -23,6 +23,7 @@ class MergeJoin : public IJoin
public:
MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block);
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & block, bool check_limits) override;
void joinBlock(Block &, ExtraBlockPtr & not_processed) override;
void joinTotals(Block &) const override;

View File

@ -184,12 +184,13 @@ private:
// synchronous log flushing for SYSTEM FLUSH LOGS.
uint64_t queue_front_index = 0;
bool is_shutdown = false;
// A flag that says we must create the tables even if the queue is empty.
bool is_force_prepare_tables = false;
std::condition_variable flush_event;
// Requested to flush logs up to this index, exclusive
uint64_t requested_flush_before = 0;
uint64_t requested_flush_up_to = 0;
// Flushed log up to this index, exclusive
uint64_t flushed_before = 0;
uint64_t flushed_up_to = 0;
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
@ -267,8 +268,8 @@ void SystemLog<LogElement>::add(const LogElement & element)
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_before < queue_end)
requested_flush_before = queue_end;
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
@ -304,24 +305,36 @@ void SystemLog<LogElement>::add(const LogElement & element)
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
{
std::unique_lock lock(mutex);
uint64_t this_thread_requested_offset;
if (is_shutdown)
return;
const uint64_t queue_end = queue_front_index + queue.size();
is_force_prepare_tables = force;
if (requested_flush_before < queue_end || force)
{
requested_flush_before = queue_end;
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to,
this_thread_requested_offset);
flush_event.notify_all();
}
// Use an arbitrary timeout to avoid endless waiting.
const int timeout_seconds = 60;
LOG_DEBUG(log, "Requested flush up to offset {}",
this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds),
[&] { return flushed_before >= queue_end && !is_force_prepare_tables; });
[&] { return flushed_up_to >= this_thread_requested_offset
&& !is_force_prepare_tables; });
if (!result)
{
@ -371,6 +384,8 @@ void SystemLog<LogElement>::savingThreadFunction()
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false;
{
std::unique_lock lock(mutex);
@ -378,7 +393,7 @@ void SystemLog<LogElement>::savingThreadFunction()
std::chrono::milliseconds(flush_interval_milliseconds),
[&] ()
{
return requested_flush_before > flushed_before || is_shutdown || is_force_prepare_tables;
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
@ -389,18 +404,14 @@ void SystemLog<LogElement>::savingThreadFunction()
to_flush.resize(0);
queue.swap(to_flush);
should_prepare_tables_anyway = is_force_prepare_tables;
exit_this_thread = is_shutdown;
}
if (to_flush.empty())
{
bool force;
{
std::lock_guard lock(mutex);
force = is_force_prepare_tables;
}
if (force)
if (should_prepare_tables_anyway)
{
prepareTable();
LOG_TRACE(log, "Table created (force)");
@ -429,7 +440,8 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
{
try
{
LOG_TRACE(log, "Flushing system log, {} entries to flush", to_flush.size());
LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}",
to_flush.size(), to_flush_end);
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
@ -468,12 +480,12 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
{
std::lock_guard lock(mutex);
flushed_before = to_flush_end;
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
}
LOG_TRACE(log, "Flushed system log");
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
}

View File

@ -8,9 +8,35 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
InputPorts createInputPorts(
const Block & header,
size_t num_ports,
IProcessor::PortNumbers delayed_ports,
bool assert_main_ports_empty)
{
if (!assert_main_ports_empty)
return InputPorts(num_ports, header);
InputPorts res;
std::sort(delayed_ports.begin(), delayed_ports.end());
size_t next_delayed_port = 0;
for (size_t i = 0; i < num_ports; ++i)
{
if (next_delayed_port < delayed_ports.size() && i == delayed_ports[next_delayed_port])
{
res.emplace_back(header);
++next_delayed_port;
}
else
res.emplace_back(Block());
}
return res;
}
DelayedPortsProcessor::DelayedPortsProcessor(
const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty)
: IProcessor(InputPorts(num_ports, header),
: IProcessor(createInputPorts(header, num_ports, delayed_ports, assert_main_ports_empty),
OutputPorts((assert_main_ports_empty ? delayed_ports.size() : num_ports), header))
, num_delayed_ports(delayed_ports.size())
{

View File

@ -7,6 +7,7 @@
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/Serializations/SerializationNullable.h>
namespace DB
@ -338,8 +339,10 @@ void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, I
const auto & index = column_mapping->column_indexes_for_input_fields[file_column];
if (index)
{
bool can_be_parsed_as_null = removeLowCardinality(type)->isNullable();
// check null value for type is not nullable. don't cross buffer bound for simplicity, so maybe missing some case
if (!type->isNullable() && !in.eof())
if (!can_be_parsed_as_null && !in.eof())
{
if (*in.position() == '\\' && in.available() >= 2)
{

View File

@ -252,10 +252,10 @@ static Pipes removeEmptyPipes(Pipes pipes)
Pipe Pipe::unitePipes(Pipes pipes)
{
return Pipe::unitePipes(std::move(pipes), nullptr);
return Pipe::unitePipes(std::move(pipes), nullptr, false);
}
Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors)
Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header)
{
Pipe res;
@ -275,12 +275,25 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors)
OutputPortRawPtrs totals;
OutputPortRawPtrs extremes;
res.header = pipes.front().header;
res.collected_processors = collected_processors;
res.header = pipes.front().header;
if (allow_empty_header && !res.header)
{
for (const auto & pipe : pipes)
{
if (const auto & header = pipe.getHeader())
{
res.header = header;
break;
}
}
}
for (auto & pipe : pipes)
{
assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes");
if (!allow_empty_header || pipe.header)
assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes");
res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());

View File

@ -155,7 +155,7 @@ private:
/// This methods are for QueryPipeline. It is allowed to complete graph only there.
/// So, we may be sure that Pipe always has output port if not empty.
bool isCompleted() const { return !empty() && output_ports.empty(); }
static Pipe unitePipes(Pipes pipes, Processors * collected_processors);
static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header);
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
void setOutputFormat(ProcessorPtr output);

View File

@ -211,11 +211,14 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output)
QueryPipeline QueryPipeline::unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
const Block & common_header,
const ExpressionActionsSettings & settings,
size_t max_threads_limit,
Processors * collected_processors)
{
if (pipelines.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of pipelines");
Block common_header = pipelines.front()->getHeader();
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
/// If true, result max_threads will be sum(max_threads).
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
@ -229,19 +232,7 @@ QueryPipeline QueryPipeline::unitePipelines(
pipeline.checkInitialized();
pipeline.pipe.collected_processors = collected_processors;
if (!pipeline.isCompleted())
{
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
common_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag, settings);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, actions);
});
}
assertBlocksHaveEqualStructure(pipeline.getHeader(), common_header, "QueryPipeline::unitePipelines");
pipes.emplace_back(std::move(pipeline.pipe));
@ -255,7 +246,7 @@ QueryPipeline QueryPipeline::unitePipelines(
}
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes), collected_processors));
pipeline.init(Pipe::unitePipes(std::move(pipes), collected_processors, false));
if (will_limit_max_threads)
{
@ -289,7 +280,9 @@ void QueryPipeline::addCreatingSetsTransform(const Block & res_header, SubqueryF
void QueryPipeline::addPipelineBefore(QueryPipeline pipeline)
{
checkInitializedAndNotCompleted();
assertBlocksHaveEqualStructure(getHeader(), pipeline.getHeader(), "QueryPipeline");
if (pipeline.getHeader())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for CreatingSets should have empty header. Got: {}",
pipeline.getHeader().dumpStructure());
IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts());
for (size_t i = 0; i < delayed_streams.size(); ++i)
@ -300,7 +293,7 @@ void QueryPipeline::addPipelineBefore(QueryPipeline pipeline)
Pipes pipes;
pipes.emplace_back(std::move(pipe));
pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline)));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
pipe = Pipe::unitePipes(std::move(pipes), collected_processors, true);
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams, true);
addTransform(std::move(processor));

View File

@ -90,13 +90,12 @@ public:
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static QueryPipeline unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
const Block & common_header,
const ExpressionActionsSettings & settings,
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
/// Add other pipeline and execute it before current one.
/// Pipeline must have same header.
/// Pipeline must have empty header, it should not generate any chunk.
/// This is used for CreatingSets.
void addPipelineBefore(QueryPipeline pipeline);
void addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, const Context & context);

View File

@ -1,42 +0,0 @@
#include <Processors/QueryPlan/AddingDelayedSourceStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false, /// New rows are added from delayed stream
}
};
}
AddingDelayedSourceStep::AddingDelayedSourceStep(
const DataStream & input_stream_,
ProcessorPtr source_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, source(std::move(source_))
{
}
void AddingDelayedSourceStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
source->setQueryPlanStep(this);
pipeline.addDelayedStream(source);
/// Now, after adding delayed stream, it has implicit dependency on other port.
/// Here we add resize processor to remove this dependency.
/// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck`
pipeline.resize(pipeline.getNumStreams(), true);
}
}

View File

@ -1,28 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
/// Adds another source to pipeline. Data from this source will be read after data from all other sources.
/// NOTE: tis step is needed because of non-joined data from JOIN. Remove this step after adding JoinStep.
class AddingDelayedSourceStep : public ITransformingStep
{
public:
AddingDelayedSourceStep(
const DataStream & input_stream_,
ProcessorPtr source_);
String getName() const override { return "AddingDelayedSource"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
private:
ProcessorPtr source;
};
}

View File

@ -30,12 +30,11 @@ static ITransformingStep::Traits getTraits()
CreatingSetStep::CreatingSetStep(
const DataStream & input_stream_,
Block header,
String description_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
const Context & context_)
: ITransformingStep(input_stream_, header, getTraits())
: ITransformingStep(input_stream_, Block{}, getTraits())
, description(std::move(description_))
, subquery_for_set(std::move(subquery_for_set_))
, network_transfer_limits(std::move(network_transfer_limits_))
@ -70,10 +69,12 @@ CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_)
output_stream = input_streams.front();
for (size_t i = 1; i < input_streams.size(); ++i)
assertBlocksHaveEqualStructure(output_stream->header, input_streams[i].header, "CreatingSets");
if (input_streams[i].header)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Creating set input must have empty header. Got: {}",
input_streams[i].header.dumpStructure());
}
QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings)
QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
if (pipelines.empty())
throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR);
@ -82,14 +83,13 @@ QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, cons
if (pipelines.size() == 1)
return main_pipeline;
std::swap(pipelines.front(), pipelines.back());
pipelines.pop_back();
pipelines.erase(pipelines.begin());
QueryPipeline delayed_pipeline;
if (pipelines.size() > 1)
{
QueryPipelineProcessorsCollector collector(delayed_pipeline, this);
delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header, settings.getActionsSettings());
delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines));
processors = collector.detachProcessors();
}
else
@ -129,7 +129,6 @@ void addCreatingSetsStep(
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
input_streams.front().header,
std::move(description),
std::move(set),
limits,

View File

@ -12,7 +12,6 @@ class CreatingSetStep : public ITransformingStep
public:
CreatingSetStep(
const DataStream & input_stream_,
Block header,
String description_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
@ -38,7 +37,7 @@ public:
String getName() const override { return "CreatingSets"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;

View File

@ -4,6 +4,8 @@
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Interpreters/JoinSwitcher.h>
namespace DB
{
@ -108,12 +110,14 @@ void ExpressionStep::describeActions(FormatSettings & settings) const
settings.out << '\n';
}
JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_)
JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_)
: ITransformingStep(
input_stream_,
Transform::transformHeader(input_stream_.header, join_),
getJoinTraits())
, join(std::move(join_))
, has_non_joined_rows(has_non_joined_rows_)
, max_block_size(max_block_size_)
{
}
@ -132,6 +136,21 @@ void JoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipel
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<Transform>(header, join, on_totals, add_default_totals);
});
if (has_non_joined_rows)
{
const Block & join_result_sample = pipeline.getHeader();
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, max_block_size);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
source->setQueryPlanStep(this);
pipeline.addDelayedStream(source);
/// Now, after adding delayed stream, it has implicit dependency on other port.
/// Here we add resize processor to remove this dependency.
/// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck`
pipeline.resize(pipeline.getNumStreams(), true);
}
}
}

View File

@ -40,13 +40,17 @@ class JoinStep : public ITransformingStep
public:
using Transform = JoiningTransform;
explicit JoinStep(const DataStream & input_stream_, JoinPtr join_);
explicit JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_);
String getName() const override { return "Join"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
const JoinPtr & getJoin() const { return join; }
private:
JoinPtr join;
bool has_non_joined_rows;
size_t max_block_size;
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <array>
namespace DB
@ -23,6 +24,7 @@ struct Optimization
using Function = size_t (*)(QueryPlan::Node *, QueryPlan::Nodes &);
const Function apply = nullptr;
const char * name;
const bool QueryPlanOptimizationSettings::* const is_enabled;
};
/// Move ARRAY JOIN up if possible.
@ -46,11 +48,11 @@ inline const auto & getOptimizations()
{
static const std::array<Optimization, 5> optimizations =
{{
{tryLiftUpArrayJoin, "liftUpArrayJoin"},
{tryPushDownLimit, "pushDownLimit"},
{trySplitFilter, "splitFilter"},
{tryMergeExpressions, "mergeExpressions"},
{tryPushDownFilter, "pushDownFilter"},
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
}};
return optimizations;

View File

@ -8,7 +8,9 @@ namespace DB
QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const Settings & from)
{
QueryPlanOptimizationSettings settings;
settings.optimize_plan = from.query_plan_enable_optimizations;
settings.max_optimizations_to_apply = from.query_plan_max_optimizations_to_apply;
settings.filter_push_down = from.query_plan_filter_push_down;
return settings;
}

View File

@ -14,6 +14,12 @@ struct QueryPlanOptimizationSettings
/// It helps to avoid infinite optimization loop.
size_t max_optimizations_to_apply = 0;
/// If disabled, no optimization applied.
bool optimize_plan = true;
/// If filter push down optimization is enabled.
bool filter_push_down = true;
static QueryPlanOptimizationSettings fromSettings(const Settings & from);
static QueryPlanOptimizationSettings fromContext(const Context & from);
};

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
@ -11,8 +12,10 @@
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -73,8 +76,8 @@ static size_t tryAddNewFilterStep(
child_node->children.emplace_back(&node);
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is added to the end.
auto split_filter_column_name = (*split_filter->getIndex().rbegin())->result_name;
/// New filter column is the first one.
auto split_filter_column_name = (*split_filter->getIndex().begin())->result_name;
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputStream(),
std::move(split_filter), std::move(split_filter_column_name), true);
@ -82,7 +85,7 @@ static size_t tryAddNewFilterStep(
return 3;
}
static Names getAggregatinKeys(const Aggregator::Params & params)
static Names getAggregatingKeys(const Aggregator::Params & params)
{
Names keys;
keys.reserve(params.keys.size());
@ -112,17 +115,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
if (auto * aggregating = typeid_cast<AggregatingStep *>(child.get()))
{
const auto & params = aggregating->getParams();
Names keys = getAggregatinKeys(params);
Names keys = getAggregatingKeys(params);
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys))
return updated_steps;
}
if (typeid_cast<CreatingSetsStep *>(child.get()))
{
/// CreatingSets does not change header.
/// We can push down filter and update header.
/// - Something
/// Filter - CreatingSets - CreatingSet
/// - CreatingSet
auto input_streams = child->getInputStreams();
input_streams.front() = filter->getOutputStream();
child = std::make_unique<CreatingSetsStep>(input_streams);
std::swap(parent, child);
std::swap(parent_node->children, child_node->children);
std::swap(parent_node->children.front(), child_node->children.front());
/// - Filter - Something
/// CreatingSets - CreatingSet
/// - CreatingSet
return 2;
}
if (auto * totals_having = typeid_cast<TotalsHavingStep *>(child.get()))
{
/// If totals step has HAVING expression, skip it for now.
/// TODO:
/// We can merge HAING expression with current filer.
/// We can merge HAVING expression with current filer.
/// Also, we can push down part of HAVING which depend only on aggregation keys.
if (totals_having->getActions())
return 0;
@ -168,6 +190,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps;
}
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
const auto & table_join = join->getJoin()->getTableJoin();
/// Push down is for left table only. We need to update JoinStep for push down into right.
/// Only inner and left join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left)
{
const auto & left_header = join->getInputStreams().front().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
for (const auto & name : table_join.keyNamesLeft())
{
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.
if (!left_header.has(name) || !res_header.has(name))
continue;
/// Skip if type is changed. Push down expression expect equal types.
if (!left_header.getByName(name).type->equals(*res_header.getByName(name).type))
continue;
allowed_keys.push_back(name);
}
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys))
return updated_steps;
}
}
/// TODO.
/// We can filter earlier if expression does not depend on WITH FILL columns.
/// But we cannot just push down condition, because other column may be filled with defaults.
@ -193,6 +245,48 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps;
}
if (auto * union_step = typeid_cast<UnionStep *>(child.get()))
{
/// Union does not change header.
/// We can push down filter and update header.
auto union_input_streams = child->getInputStreams();
for (auto & input_stream : union_input_streams)
input_stream.header = filter->getOutputStream().header;
/// - Something
/// Filter - Union - Something
/// - Something
child = std::make_unique<UnionStep>(union_input_streams, union_step->getMaxThreads());
std::swap(parent, child);
std::swap(parent_node->children, child_node->children);
std::swap(parent_node->children.front(), child_node->children.front());
/// - Filter - Something
/// Union - Something
/// - Something
for (size_t i = 1; i < parent_node->children.size(); ++i)
{
auto & filter_node = nodes.emplace_back();
filter_node.children.push_back(parent_node->children[i]);
parent_node->children[i] = &filter_node;
filter_node.step = std::make_unique<FilterStep>(
filter_node.children.front()->step->getOutputStream(),
filter->getExpression()->clone(),
filter->getFilterColumnName(),
filter->removesFilterColumn());
}
/// - Filter - Something
/// Union - Filter - Something
/// - Filter - Something
return 3;
}
return 0;
}

View File

@ -16,6 +16,9 @@ namespace QueryPlanOptimizations
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
{
if (!settings.optimize_plan)
return;
const auto & optimizations = getOptimizations();
struct Frame
@ -63,6 +66,9 @@ void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Nod
/// Apply all optimizations.
for (const auto & optimization : optimizations)
{
if (!(settings.*(optimization.is_enabled)))
continue;
/// Just in case, skip optimization if it is not initialized.
if (!optimization.apply)
continue;

View File

@ -6,8 +6,25 @@
namespace DB
{
UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_)
: header(std::move(result_header))
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static Block checkHeaders(const DataStreams & input_streams)
{
if (input_streams.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of query plan steps");
Block res = input_streams.front().header;
for (const auto & stream : input_streams)
assertBlocksHaveEqualStructure(stream.header, res, "UnionStep");
return res;
}
UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_)
: header(checkHeaders(input_streams_))
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
@ -18,7 +35,7 @@ UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max
output_stream = DataStream{.header = header};
}
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings)
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
auto pipeline = std::make_unique<QueryPipeline>();
QueryPipelineProcessorsCollector collector(*pipeline, this);
@ -30,7 +47,7 @@ QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const Build
return pipeline;
}
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header, settings.getActionsSettings(), max_threads);
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads);
processors = collector.detachProcessors();
return pipeline;

View File

@ -9,14 +9,16 @@ class UnionStep : public IQueryPlanStep
{
public:
/// max_threads is used to limit the number of threads for result pipeline.
UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_ = 0);
explicit UnionStep(DataStreams input_streams_, size_t max_threads_ = 0);
String getName() const override { return "Union"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;
size_t getMaxThreads() const { return max_threads; }
private:
Block header;
size_t max_threads;

View File

@ -93,7 +93,6 @@ SRCS(
Pipe.cpp
Port.cpp
QueryPipeline.cpp
QueryPlan/AddingDelayedSourceStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/ArrayJoinStep.cpp
QueryPlan/BuildQueryPipelineSettings.cpp

View File

@ -9,6 +9,8 @@
#include <Common/quoteString.h>
#include <Common/hex.h>
#include <Common/ActionBlocker.h>
#include <Common/formatReadable.h>
#include <Common/Stopwatch.h>
#include <common/StringRef.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
@ -523,7 +525,7 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `{}`", file_path);
Stopwatch watch;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
try
@ -533,6 +535,10 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
LOG_TRACE(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts,
distributed_header.insert_query,
@ -550,7 +556,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
markAsSend(file_path);
LOG_TRACE(log, "Finished processing `{}`", file_path);
LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds());
}
struct StorageDistributedDirectoryMonitor::BatchHeader
@ -623,6 +629,12 @@ struct StorageDistributedDirectoryMonitor::Batch
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
Stopwatch watch;
LOG_TRACE(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
if (!recovered)
{
/// For deduplication in Replicated tables to work, in case of error
@ -697,7 +709,7 @@ struct StorageDistributedDirectoryMonitor::Batch
if (!batch_broken)
{
LOG_TRACE(parent.log, "Sent a batch of {} files.", file_indices.size());
LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", file_indices.size(), watch.elapsedMilliseconds());
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
for (UInt64 file_index : file_indices)

View File

@ -58,6 +58,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log)
@ -93,7 +94,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_)
: context(context_)
: context(std::make_unique<Context>(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
@ -103,6 +104,10 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
, insert_timeout(insert_timeout_)
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
const auto & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
context->getClientInfo().distributed_depth += 1;
}
@ -143,7 +148,7 @@ void DistributedBlockOutputStream::write(const Block & block)
void DistributedBlockOutputStream::writeAsync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
if (random_shard_insert)
@ -194,7 +199,7 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = end - start;
@ -303,7 +308,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
/// Do not initiate INSERT for empty block.
if (shard_block.rows() == 0)
@ -343,7 +348,8 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
if (throttler)
job.connection_entry->setThrottler(throttler);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, settings, context.getClientInfo());
job.stream = std::make_shared<RemoteBlockOutputStream>(
*job.connection_entry, timeouts, query_string, settings, context->getClientInfo());
job.stream->writePrefix();
}
@ -357,7 +363,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
if (!job.stream)
{
/// Forward user settings
job.local_context = std::make_unique<Context>(context);
job.local_context = std::make_unique<Context>(*context);
/// Copying of the query AST is required to avoid race,
/// in case of INSERT into multiple local shards.
@ -385,7 +391,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
void DistributedBlockOutputStream::writeSync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0;
@ -562,7 +568,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context.getSettingsRef();
const auto & settings = context->getSettingsRef();
if (shard_info.hasInternalReplication())
{
@ -598,7 +604,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh
void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats)
{
/// Async insert does not support settings forwarding yet whereas sync one supports
InterpreterInsertQuery interp(query_ast, context);
InterpreterInsertQuery interp(query_ast, *context);
auto block_io = interp.execute();
@ -610,7 +616,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repe
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context.getSettingsRef();
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
bool fsync = distributed_settings.fsync_after_insert;
@ -675,8 +681,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
WriteBufferFromOwnString header_buf;
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().write(header_buf);
context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
context->getSettingsRef().write(header_buf);
context->getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
writeVarUInt(block.rows(), header_buf);
writeVarUInt(block.bytes(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); /// obsolete
@ -730,7 +736,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
Poco::File(first_file_tmp_path).remove();
/// Notify
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
@ -738,5 +744,4 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
}
}
}

View File

@ -84,7 +84,7 @@ private:
std::string getCurrentStateDescription();
private:
const Context & context;
std::unique_ptr<Context> context;
StorageDistributed & storage;
StorageMetadataPtr metadata_snapshot;
ASTPtr query_ast;

View File

@ -48,38 +48,13 @@ void TemporaryLiveViewCleaner::init(Context & global_context_)
the_instance.reset(new TemporaryLiveViewCleaner(global_context_));
}
void TemporaryLiveViewCleaner::startupIfNecessary()
void TemporaryLiveViewCleaner::startup()
{
background_thread_can_start = true;
std::lock_guard lock{mutex};
if (background_thread_should_exit)
return;
if (!views.empty())
startupIfNecessaryImpl(lock);
else
can_start_background_thread = true;
}
void TemporaryLiveViewCleaner::startupIfNecessaryImpl(const std::lock_guard<std::mutex> &)
{
/// If views.empty() the background thread isn't running or it's going to stop right now.
/// If can_start_background_thread is false, then the thread has not been started previously.
bool background_thread_is_running;
if (can_start_background_thread)
{
background_thread_is_running = !views.empty();
}
else
{
can_start_background_thread = true;
background_thread_is_running = false;
}
if (!background_thread_is_running)
{
if (background_thread.joinable())
background_thread.join();
background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this};
}
startBackgroundThread();
}
void TemporaryLiveViewCleaner::shutdown()
@ -87,13 +62,11 @@ void TemporaryLiveViewCleaner::shutdown()
the_instance.reset();
}
TemporaryLiveViewCleaner::TemporaryLiveViewCleaner(Context & global_context_)
: global_context(global_context_)
{
}
TemporaryLiveViewCleaner::~TemporaryLiveViewCleaner()
{
stopBackgroundThread();
@ -108,27 +81,29 @@ void TemporaryLiveViewCleaner::addView(const std::shared_ptr<StorageLiveView> &
auto current_time = std::chrono::system_clock::now();
auto time_of_next_check = current_time + view->getTimeout();
std::lock_guard lock{mutex};
if (background_thread_should_exit)
return;
if (can_start_background_thread)
startupIfNecessaryImpl(lock);
/// Keep the vector `views` sorted by time of next check.
StorageAndTimeOfCheck storage_and_time_of_check{view, time_of_next_check};
std::lock_guard lock{mutex};
views.insert(std::upper_bound(views.begin(), views.end(), storage_and_time_of_check), storage_and_time_of_check);
background_thread_wake_up.notify_one();
if (background_thread_can_start)
{
startBackgroundThread();
background_thread_wake_up.notify_one();
}
}
void TemporaryLiveViewCleaner::backgroundThreadFunc()
{
std::unique_lock lock{mutex};
while (!background_thread_should_exit && !views.empty())
while (!background_thread_should_exit)
{
background_thread_wake_up.wait_until(lock, views.front().time_of_check);
if (views.empty())
background_thread_wake_up.wait(lock);
else
background_thread_wake_up.wait_until(lock, views.front().time_of_check);
if (background_thread_should_exit)
break;
@ -173,14 +148,18 @@ void TemporaryLiveViewCleaner::backgroundThreadFunc()
}
void TemporaryLiveViewCleaner::startBackgroundThread()
{
if (!background_thread.joinable() && background_thread_can_start && !background_thread_should_exit)
background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this};
}
void TemporaryLiveViewCleaner::stopBackgroundThread()
{
background_thread_should_exit = true;
background_thread_wake_up.notify_one();
if (background_thread.joinable())
{
background_thread_should_exit = true;
background_thread_wake_up.notify_one();
background_thread.join();
}
}
}

View File

@ -23,8 +23,7 @@ public:
static void init(Context & global_context_);
static void shutdown();
void startupIfNecessary();
void startupIfNecessaryImpl(const std::lock_guard<std::mutex> &);
void startup();
private:
friend std::unique_ptr<TemporaryLiveViewCleaner>::deleter_type;
@ -33,6 +32,7 @@ private:
~TemporaryLiveViewCleaner();
void backgroundThreadFunc();
void startBackgroundThread();
void stopBackgroundThread();
struct StorageAndTimeOfCheck
@ -47,7 +47,7 @@ private:
std::mutex mutex;
std::vector<StorageAndTimeOfCheck> views;
ThreadFromGlobalPool background_thread;
bool can_start_background_thread = false;
std::atomic<bool> background_thread_can_start = false;
std::atomic<bool> background_thread_should_exit = false;
std::condition_variable background_thread_wake_up;
};

View File

@ -1360,8 +1360,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
for (const auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
const auto & common_header = plans.front()->getCurrentDataStream().header;
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), common_header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
auto plan = std::make_unique<QueryPlan>();
plan->unitePlans(std::move(union_step), std::move(plans));

View File

@ -44,12 +44,11 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
for (const auto & range : all_mark_ranges)
total_marks_count += range.end - range.begin;
size_t total_rows = data_part->index_granularity.getTotalRows();
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_TRACE(log, "Reading {} ranges in reverse order from part {}, approx. {}, up to {} rows starting from {}",
LOG_TRACE(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getRowsCountInRanges(all_mark_ranges),
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
addTotalRowsApprox(total_rows);

View File

@ -341,13 +341,28 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
/// If it exists on our replica, ignore it.
if (storage.getActiveContainingPart(existing_part_name))
{
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
part->is_duplicate = true;
last_block_is_duplicate = true;
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
if (quorum)
{
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it, but checking quorum.", block_id, existing_part_name);
std::string quorum_path;
if (quorum_parallel)
quorum_path = storage.zookeeper_path + "/quorum/parallel/" + existing_part_name;
else
quorum_path = storage.zookeeper_path + "/quorum/status";
waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_value);
}
else
{
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
}
return;
}
LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.",
block_id, existing_part_name);
@ -486,50 +501,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
storage.updateQuorum(part->name, false);
}
/// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum");
try
{
while (true)
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_info.status_path, value, nullptr, event))
break;
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_info.status_path);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part->name)
break;
if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_info.status_path);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|| value != quorum_info.is_active_node_value)
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
}
catch (...)
{
/// We do not know whether or not data has been inserted
/// - whether other replicas have time to download the part and mark the quorum as done.
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
LOG_TRACE(log, "Quorum satisfied");
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value);
}
}
@ -541,4 +513,57 @@ void ReplicatedMergeTreeBlockOutputStream::writePrefix()
}
void ReplicatedMergeTreeBlockOutputStream::waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,
const std::string & is_active_node_value) const
{
/// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum");
try
{
while (true)
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_path, value, nullptr, event))
break;
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_path);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part_name)
break;
if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_path);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|| value != is_active_node_value)
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
}
catch (...)
{
/// We do not know whether or not data has been inserted
/// - whether other replicas have time to download the part and mark the quorum as done.
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
LOG_TRACE(log, "Quorum satisfied");
}
}

View File

@ -63,6 +63,12 @@ private:
/// Rename temporary part and commit to ZooKeeper.
void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id);
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
/// Also checks that replica still alive.
void waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name,
const std::string & quorum_path, const std::string & is_active_node_value) const;
StorageReplicatedMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t quorum;

View File

@ -423,7 +423,7 @@ void StorageBuffer::read(
plans.emplace_back(std::make_unique<QueryPlan>(std::move(buffers_plan)));
query_plan = QueryPlan();
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), result_header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
union_step->setStepDescription("Unite sources from Buffer table");
query_plan.unitePlans(std::move(union_step), std::move(plans));
}

View File

@ -697,7 +697,7 @@ QueryPipelinePtr StorageDistributed::distributedWrite(const ASTInsertQuery & que
}
return std::make_unique<QueryPipeline>(
QueryPipeline::unitePipelines(std::move(pipelines), {}, ExpressionActionsSettings::fromContext(context)));
QueryPipeline::unitePipelines(std::move(pipelines)));
}

View File

@ -334,12 +334,11 @@ class ClickhouseIntegrationTestsRunner:
logging.info("Task timeout exceeded, skipping %s", test)
counters["SKIPPED"].append(test)
tests_times[test] = 0
log_name = None
log_path = None
return counters, tests_times, log_name, log_path
return counters, tests_times, []
image_cmd = self._get_runner_image_cmd(repo_path)
test_group_str = test_group.replace('/', '_').replace('.', '_')
log_paths = []
for i in range(num_tries):
logging.info("Running test group %s for the %s retry", test_group, i)
@ -348,6 +347,7 @@ class ClickhouseIntegrationTestsRunner:
output_path = os.path.join(str(self.path()), "test_output_" + test_group_str + "_" + str(i) + ".log")
log_name = "integration_run_" + test_group_str + "_" + str(i) + ".txt"
log_path = os.path.join(str(self.path()), log_name)
log_paths.append(log_path)
logging.info("Will wait output inside %s", output_path)
test_names = set([])
@ -390,7 +390,7 @@ class ClickhouseIntegrationTestsRunner:
if test not in counters["PASSED"] and test not in counters["ERROR"] and test not in counters["FAILED"]:
counters["ERROR"].append(test)
return counters, tests_times, log_name, log_path
return counters, tests_times, log_paths
def run_flaky_check(self, repo_path, build_path):
pr_info = self.params['pr_info']
@ -408,12 +408,12 @@ class ClickhouseIntegrationTestsRunner:
start = time.time()
logging.info("Starting check with retries")
final_retry = 0
log_paths = []
logs = []
for i in range(TRIES_COUNT):
final_retry += 1
logging.info("Running tests for the %s time", i)
counters, tests_times, _, log_path = self.run_test_group(repo_path, "flaky", tests_to_run, 1)
log_paths.append(log_path)
counters, tests_times, log_paths = self.run_test_group(repo_path, "flaky", tests_to_run, 1)
logs += log_paths
if counters["FAILED"]:
logging.info("Found failed tests: %s", ' '.join(counters["FAILED"]))
description_prefix = "Flaky tests found: "
@ -450,7 +450,7 @@ class ClickhouseIntegrationTestsRunner:
test_result += [(c + ' (✕' + str(final_retry) + ')', text_state, "{:.2f}".format(tests_times[c])) for c in counters[state]]
status_text = description_prefix + ', '.join([str(n).lower().replace('failed', 'fail') + ': ' + str(len(c)) for n, c in counters.items()])
return result_state, status_text, test_result, [test_logs] + log_paths
return result_state, status_text, test_result, [test_logs] + logs
def run_impl(self, repo_path, build_path):
if self.flaky_check:
@ -471,8 +471,8 @@ class ClickhouseIntegrationTestsRunner:
"FLAKY": [],
}
tests_times = defaultdict(float)
tests_log_paths = defaultdict(list)
logs = []
items_to_run = list(grouped_tests.items())
logging.info("Total test groups %s", len(items_to_run))
@ -482,7 +482,7 @@ class ClickhouseIntegrationTestsRunner:
for group, tests in items_to_run:
logging.info("Running test group %s countaining %s tests", group, len(tests))
group_counters, group_test_times, _, log_path = self.run_test_group(repo_path, group, tests, MAX_RETRY)
group_counters, group_test_times, log_paths = self.run_test_group(repo_path, group, tests, MAX_RETRY)
total_tests = 0
for counter, value in group_counters.items():
logging.info("Tests from group %s stats, %s count %s", group, counter, len(value))
@ -493,7 +493,8 @@ class ClickhouseIntegrationTestsRunner:
for test_name, test_time in group_test_times.items():
tests_times[test_name] = test_time
logs.append(log_path)
tests_log_paths[test_name] = log_paths
if len(counters["FAILED"]) + len(counters["ERROR"]) >= 20:
logging.info("Collected more than 20 failed/error tests, stopping")
break
@ -518,7 +519,7 @@ class ClickhouseIntegrationTestsRunner:
text_state = "FAIL"
else:
text_state = state
test_result += [(c, text_state, "{:.2f}".format(tests_times[c])) for c in counters[state]]
test_result += [(c, text_state, "{:.2f}".format(tests_times[c]), tests_log_paths[c]) for c in counters[state]]
failed_sum = len(counters['FAILED']) + len(counters['ERROR'])
status_text = "fail: {}, passed: {}, flaky: {}".format(failed_sum, len(counters['PASSED']), len(counters['FLAKY']))
@ -535,7 +536,7 @@ class ClickhouseIntegrationTestsRunner:
if '(memory)' in self.params['context_name']:
result_state = "success"
return result_state, status_text, test_result, [test_logs] + logs
return result_state, status_text, test_result, [test_logs]
def write_results(results_file, status_file, results, status):
with open(results_file, 'w') as f:

View File

@ -13,12 +13,12 @@
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='LEFT'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>

View File

@ -3,53 +3,54 @@
<settings>
<join_algorithm>partial_merge</join_algorithm>
<query_plan_filter_push_down>0</query_plan_filter_push_down>
</settings>
<fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 20000 + number % 100 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 30000 + number % 10 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT 40000 + number % 1 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query>
<fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 20000 + number % 100 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 30000 + number % 10 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<fill_query>INSERT INTO ints SELECT 40000 + number % 1 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000) settings query_plan_filter_push_down = 0</fill_query>
<query short='1' tag='ANY LEFT'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query short='1' tag='ANY LEFT KEY'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query short='1' tag='ANY LEFT ON'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query short='1' tag='ANY LEFT'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT KEY'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT ON'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query tag='LEFT'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='LEFT'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT KEY (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT ON (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT IN (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0</query>
<query short='1' tag='ANY LEFT (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT KEY (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT ON (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query short='1' tag='ANY LEFT IN (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER KEY (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER ON (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER IN (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER KEY (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER ON (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='INNER IN (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT KEY (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT ON (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT IN (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT KEY (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT ON (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='LEFT IN (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) SETTINGS partial_merge_join_optimizations = 0, query_plan_filter_push_down = 0</query>
<query tag='RIGHT'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='RIGHT KEY'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='RIGHT ON'>SELECT COUNT() FROM ints l RIGHT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='RIGHT IN'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='RIGHT'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='RIGHT KEY'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='RIGHT ON'>SELECT COUNT() FROM ints l RIGHT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='RIGHT IN'>SELECT COUNT() FROM ints l RIGHT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<query tag='FULL'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='FULL KEY'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042</query>
<query tag='FULL ON'>SELECT COUNT() FROM ints l FULL JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='FULL IN'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='FULL'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='FULL KEY'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='FULL ON'>SELECT COUNT() FROM ints l FULL JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='FULL IN'>SELECT COUNT() FROM ints l FULL JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042) settings query_plan_filter_push_down = 0</query>
<drop_query>DROP TABLE IF EXISTS ints</drop_query>
</test>

View File

@ -5,13 +5,13 @@ DROP TABLE IF EXISTS distr2;
CREATE TABLE distr (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr); -- { serverError 269 }
CREATE TABLE distr0 (x UInt8) ENGINE = Distributed(test_shard_localhost, '', distr0);
SELECT * FROM distr0; -- { serverError 306 }
SELECT * FROM distr0; -- { serverError 581 }
CREATE TABLE distr1 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr2);
CREATE TABLE distr2 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr1);
SELECT * FROM distr1; -- { serverError 306 }
SELECT * FROM distr2; -- { serverError 306 }
SELECT * FROM distr1; -- { serverError 581 }
SELECT * FROM distr2; -- { serverError 581 }
DROP TABLE distr0;
DROP TABLE distr1;

View File

@ -23,7 +23,7 @@ set is_done 0
while {$is_done == 0} {
send -- "\t"
expect {
"_connections" {
"_" {
set is_done 1
}
default {

View File

@ -1,16 +1,16 @@
DROP TABLE IF EXISTS r1;
DROP TABLE IF EXISTS r2;
DROP TABLE IF EXISTS r1 SYNC;
DROP TABLE IF EXISTS r2 SYNC;
CREATE TABLE r1 (
key UInt64, value String
)
ENGINE = ReplicatedMergeTree('/clickhouse/01509_no_repliacs', '1')
ENGINE = ReplicatedMergeTree('/clickhouse/01509_parallel_quorum_insert_no_replicas', '1')
ORDER BY tuple();
CREATE TABLE r2 (
key UInt64, value String
)
ENGINE = ReplicatedMergeTree('/clickhouse/01509_no_repliacs', '2')
ENGINE = ReplicatedMergeTree('/clickhouse/01509_parallel_quorum_insert_no_replicas', '2')
ORDER BY tuple();
SET insert_quorum_parallel=1;
@ -18,8 +18,13 @@ SET insert_quorum_parallel=1;
SET insert_quorum=3;
INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
-- retry should still fail despite the insert_deduplicate enabled
INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
SELECT 'insert to two replicas works';
SET insert_quorum=2, insert_quorum_parallel=1;
INSERT INTO r1 VALUES(1, '1');
SELECT COUNT() FROM r1;
@ -29,12 +34,18 @@ DETACH TABLE r2;
INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
-- retry should fail despite the insert_deduplicate enabled
INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
SET insert_quorum=1, insert_quorum_parallel=1;
SELECT 'insert to single replica works';
INSERT INTO r1 VALUES(2, '2');
ATTACH TABLE r2;
INSERT INTO r2 VALUES(2, '2');
SYSTEM SYNC REPLICA r2;
SET insert_quorum=2, insert_quorum_parallel=1;
@ -47,6 +58,17 @@ SELECT COUNT() FROM r2;
SELECT 'deduplication works';
INSERT INTO r2 VALUES(3, '3');
-- still works if we relax quorum
SET insert_quorum=1, insert_quorum_parallel=1;
INSERT INTO r2 VALUES(3, '3');
INSERT INTO r1 VALUES(3, '3');
-- will start failing if we increase quorum
SET insert_quorum=3, insert_quorum_parallel=1;
INSERT INTO r1 VALUES(3, '3'); --{serverError 285}
-- work back ok when quorum=2
SET insert_quorum=2, insert_quorum_parallel=1;
INSERT INTO r2 VALUES(3, '3');
SELECT COUNT() FROM r1;
SELECT COUNT() FROM r2;
@ -56,8 +78,18 @@ SET insert_quorum_timeout=0;
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
-- retry should fail despite the insert_deduplicate enabled
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
SELECT * FROM r2 WHERE key=4;
SYSTEM START FETCHES r2;
SET insert_quorum_timeout=6000000;
-- now retry should be successful
INSERT INTO r1 VALUES (4, '4');
SYSTEM SYNC REPLICA r2;
SELECT 'insert happened';

View File

@ -123,3 +123,26 @@ Filter column: notEquals(y, 2)
3 10
0 37
> filter is pushed down before CreatingSets
CreatingSets
Filter
Filter
1
3
> one condition of filter is pushed down before LEFT JOIN
Join
Filter column: notEquals(number, 1)
Join
0 0
3 3
> one condition of filter is pushed down before INNER JOIN
Join
Filter column: notEquals(number, 1)
Join
3 3
> filter is pushed down before UNION
Union
Filter
Filter
2 3
2 3

View File

@ -150,3 +150,49 @@ $CLICKHOUSE_CLIENT -q "
select * from (
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
) where y != 2"
echo "> filter is pushed down before CreatingSets"
$CLICKHOUSE_CLIENT -q "
explain select number from (
select number from numbers(5) where number in (select 1 + number from numbers(3))
) where number != 2 settings enable_optimize_predicate_expression=0" |
grep -o "CreatingSets\|Filter"
$CLICKHOUSE_CLIENT -q "
select number from (
select number from numbers(5) where number in (select 1 + number from numbers(3))
) where number != 2 settings enable_optimize_predicate_expression=0"
echo "> one condition of filter is pushed down before LEFT JOIN"
$CLICKHOUSE_CLIENT -q "
explain actions = 1
select number as a, r.b from numbers(4) as l any left join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" |
grep -o "Join\|Filter column: notEquals(number, 1)"
$CLICKHOUSE_CLIENT -q "
select number as a, r.b from numbers(4) as l any left join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0"
echo "> one condition of filter is pushed down before INNER JOIN"
$CLICKHOUSE_CLIENT -q "
explain actions = 1
select number as a, r.b from numbers(4) as l any inner join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" |
grep -o "Join\|Filter column: notEquals(number, 1)"
$CLICKHOUSE_CLIENT -q "
select number as a, r.b from numbers(4) as l any inner join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0"
echo "> filter is pushed down before UNION"
$CLICKHOUSE_CLIENT -q "
explain select a, b from (
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
) where a != 1 settings enable_optimize_predicate_expression = 0" |
grep -o "Union\|Filter"
$CLICKHOUSE_CLIENT -q "
select a, b from (
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
) where a != 1 settings enable_optimize_predicate_expression = 0"

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS tt6;
CREATE TABLE tt6
(
`id` UInt32,
`first_column` UInt32,
`second_column` UInt32,
`third_column` UInt32,
`status` String
)
ENGINE = Distributed('test_shard_localhost', '', 'tt6', rand());
INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 581 }
SELECT * FROM tt6; -- { serverError 581 }
SET max_distributed_depth = 0;
-- stack overflow
INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 306}
-- stack overflow
SELECT * FROM tt6; -- { serverError 306 }
DROP TABLE tt6;

View File

@ -0,0 +1,2 @@
1 1 2
1 \N

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS t_having;
CREATE TABLE t_having (c0 Int32, c1 UInt64) ENGINE = Memory;
INSERT INTO t_having SELECT number, number FROM numbers(1000);
SELECT sum(c0 = 0), min(c0 + 1), sum(c0 + 2) FROM t_having
GROUP BY c0 HAVING c0 = 0
SETTINGS enable_optimize_predicate_expression=0;
SELECT c0 + -1, sum(intDivOrZero(intDivOrZero(NULL, NULL), '2'), intDivOrZero(10000000000., intDivOrZero(intDivOrZero(intDivOrZero(NULL, NULL), 10), NULL))) FROM t_having GROUP BY c0 = 2, c0 = 10, intDivOrZero(intDivOrZero(intDivOrZero(NULL, NULL), NULL), NULL), c0 HAVING c0 = 2 SETTINGS enable_optimize_predicate_expression = 0;
SELECT sum(c0 + 257) FROM t_having GROUP BY c0 = -9223372036854775808, NULL, -2147483649, c0 HAVING c0 = -9223372036854775808 SETTINGS enable_optimize_predicate_expression = 0;
SELECT c0 + -2, c0 + -9223372036854775807, c0 = NULL FROM t_having GROUP BY c0 = 0.9998999834060669, 1023, c0 HAVING c0 = 0.9998999834060669 SETTINGS enable_optimize_predicate_expression = 0;
DROP TABLE t_having;

View File

@ -0,0 +1,10 @@
drop table if exists data_01801;
create table data_01801 (key Int) engine=MergeTree() order by key settings index_granularity=10 as select number/10 from numbers(100);
select * from data_01801 where key = 0 order by key settings max_rows_to_read=9 format Null; -- { serverError 158 }
select * from data_01801 where key = 0 order by key desc settings max_rows_to_read=9 format Null; -- { serverError 158 }
select * from data_01801 where key = 0 order by key settings max_rows_to_read=10 format Null;
select * from data_01801 where key = 0 order by key desc settings max_rows_to_read=10 format Null;
drop table data_01801;

View File

@ -0,0 +1 @@
SELECT DISTINCT a FROM remote('127.0.0.{1,2,3}', values('a UInt8, b UInt8', (1, 2), (1, 3))) GROUP BY a, b;

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS nullable_low_cardinality_tsv_test;";
$CLICKHOUSE_CLIENT --multiquery --query="CREATE TABLE nullable_low_cardinality_tsv_test
(
A Date,
S LowCardinality(Nullable(String)),
X Int32,
S1 LowCardinality(Nullable(String)),
S2 Array(String)
) ENGINE=TinyLog";
printf '2020-01-01\t\N\t32\t\N\n' | $CLICKHOUSE_CLIENT -q 'insert into nullable_low_cardinality_tsv_test format TSV' 2>&1 \
| grep -q "Code: 27"
echo $?;
$CLICKHOUSE_CLIENT --query="DROP TABLE nullable_low_cardinality_tsv_test";

Some files were not shown because too many files have changed in this diff Show More