mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge branch 'master' into fix_test_00002
This commit is contained in:
commit
13da43ead8
2
contrib/azure
vendored
2
contrib/azure
vendored
@ -1 +1 @@
|
||||
Subproject commit e4fcdfc81e337e589ce231a452dcc280fcbb3f99
|
||||
Subproject commit 096049bf24fffafcaccc132b9367694532716731
|
@ -21,5 +21,3 @@ RUN yarn config set registry https://registry.npmjs.org \
|
||||
COPY run.sh /run.sh
|
||||
|
||||
ENTRYPOINT ["/run.sh"]
|
||||
|
||||
CMD ["yarn", "build"]
|
||||
|
@ -25,7 +25,8 @@ done
|
||||
sed -i '/onBrokenMarkdownLinks:/ s/ignore/error/g' docusaurus.config.js
|
||||
|
||||
if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
|
||||
export CI=true
|
||||
export CI=true
|
||||
yarn install
|
||||
exec yarn build "$@"
|
||||
fi
|
||||
|
||||
|
226
docs/en/engines/table-engines/special/executable.md
Normal file
226
docs/en/engines/table-engines/special/executable.md
Normal file
@ -0,0 +1,226 @@
|
||||
---
|
||||
slug: /en/engines/table-engines/special/executable
|
||||
sidebar_position: 40
|
||||
sidebar_label: Executable
|
||||
---
|
||||
|
||||
# Executable and ExecutablePool Table Engines
|
||||
|
||||
The `Executable` and `ExecutablePool` table engines allow you to define a table whose rows are generated from a script that you define (by writing rows to **stdout**). The executable script is stored in the `users_scripts` directory and can read data from any source.
|
||||
|
||||
- `Executable` tables: the script is run on every query
|
||||
- `ExecutablePool` tables: maintains a pool of persistent processes, and takes processes from the pool for reads
|
||||
|
||||
You can optionally include one or more input queries that stream their results to **stdin** for the script to read.
|
||||
|
||||
## Creating an Executable Table
|
||||
|
||||
The `Executable` table engine requires two parameters: the name of the script and the format of the incoming data. You can optionally pass in one or more input queries:
|
||||
|
||||
```sql
|
||||
Executable(script_name, format, [input_query...])
|
||||
```
|
||||
|
||||
Here are the relevant settings for an `Executable` table:
|
||||
|
||||
- `send_chunk_header`
|
||||
- Description: Send the number of rows in each chunk before sending a chunk to process. This setting can help to write your script in a more efficient way to preallocate some resources
|
||||
- Default value: false
|
||||
- `command_termination_timeout`
|
||||
- Description: Command termination timeout in seconds
|
||||
- Default value: 10
|
||||
- `command_read_timeout`
|
||||
- Description: Timeout for reading data from command stdout in milliseconds
|
||||
- Default value: 10000
|
||||
- `command_write_timeout`
|
||||
- Description: Timeout for writing data to command stdin in milliseconds
|
||||
- Default value: 10000
|
||||
|
||||
|
||||
Let's look at an example. The following Python script is named `my_script.py` and is saved in the `user_scripts` folder. It reads in a number `i` and prints `i` random strings, with each string preceded by a number that is separated by a tab:
|
||||
|
||||
```python
|
||||
#!/usr/bin/python3
|
||||
|
||||
import sys
|
||||
import string
|
||||
import random
|
||||
|
||||
def main():
|
||||
|
||||
# Read input value
|
||||
for number in sys.stdin:
|
||||
i = int(number)
|
||||
|
||||
# Generate some random rows
|
||||
for id in range(0, i):
|
||||
letters = string.ascii_letters
|
||||
random_string = ''.join(random.choices(letters ,k=10))
|
||||
print(str(id) + '\t' + random_string + '\n', end='')
|
||||
|
||||
# Flush results to stdout
|
||||
sys.stdout.flush()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
```
|
||||
|
||||
The following `my_executable_table` is built from the output of `my_script.py`, which will generate 10 random strings everytime you run a `SELECT` from `my_executable_table`:
|
||||
|
||||
```sql
|
||||
CREATE TABLE my_executable_table (
|
||||
x UInt32,
|
||||
y String
|
||||
)
|
||||
ENGINE = Executable('my_script.py', TabSeparated, (SELECT 10))
|
||||
```
|
||||
|
||||
Creating the table returns immediately and does not invoke the script. Querying `my_executable_table` causes the script to be invoked:
|
||||
|
||||
```sql
|
||||
SELECT * FROM my_executable_table
|
||||
```
|
||||
|
||||
```response
|
||||
┌─x─┬─y──────────┐
|
||||
│ 0 │ BsnKBsNGNH │
|
||||
│ 1 │ mgHfBCUrWM │
|
||||
│ 2 │ iDQAVhlygr │
|
||||
│ 3 │ uNGwDuXyCk │
|
||||
│ 4 │ GcFdQWvoLB │
|
||||
│ 5 │ UkciuuOTVO │
|
||||
│ 6 │ HoKeCdHkbs │
|
||||
│ 7 │ xRvySxqAcR │
|
||||
│ 8 │ LKbXPHpyDI │
|
||||
│ 9 │ zxogHTzEVV │
|
||||
└───┴────────────┘
|
||||
```
|
||||
|
||||
## Passing Query Results to a Script
|
||||
|
||||
Users of the Hacker News website leave comments. Python contains a natural language processing toolkit (`nltk`) with a `SentimentIntensityAnalyzer` for determining if comments are positive, negative, or neutral - including assigning a value between -1 (a very negative comment) and 1 (a very positive comment). Let's create an `Executable` table that computes the sentiment of Hacker News comments using `nltk`.
|
||||
|
||||
This example uses the `hackernews` table described [here](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/invertedindexes/#full-text-search-of-the-hacker-news-dataset). The `hackernews` table includes an `id` column of type `UInt64` and a `String` column named `comment`. Let's start by defining the `Executable` table:
|
||||
|
||||
```sql
|
||||
CREATE TABLE sentiment (
|
||||
id UInt64,
|
||||
sentiment Float32
|
||||
)
|
||||
ENGINE = Executable(
|
||||
'sentiment.py',
|
||||
TabSeparated,
|
||||
(SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20)
|
||||
);
|
||||
```
|
||||
|
||||
Some comments about the `sentiment` table:
|
||||
|
||||
- The file `sentiment.py` is saved in the `user_scripts` folder (the default folder of the `user_scripts_path` setting)
|
||||
- The `TabSeparated` format means our Python script needs to generate rows of raw data that contain tab-separated values
|
||||
- The query selects two columns from `hackernews`. The Python script will need to parse out those column values from the incoming rows
|
||||
|
||||
Here is the defintion of `sentiment.py`:
|
||||
|
||||
```python
|
||||
#!/usr/local/bin/python3.9
|
||||
|
||||
import sys
|
||||
import nltk
|
||||
from nltk.sentiment import SentimentIntensityAnalyzer
|
||||
|
||||
def main():
|
||||
sentiment_analyzer = SentimentIntensityAnalyzer()
|
||||
|
||||
while True:
|
||||
try:
|
||||
row = sys.stdin.readline()
|
||||
if row == '':
|
||||
break
|
||||
|
||||
split_line = row.split("\t")
|
||||
|
||||
id = str(split_line[0])
|
||||
comment = split_line[1]
|
||||
|
||||
score = sentiment_analyzer.polarity_scores(comment)['compound']
|
||||
print(id + '\t' + str(score) + '\n', end='')
|
||||
sys.stdout.flush()
|
||||
except BaseException as x:
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
```
|
||||
|
||||
Some comments about our Python script:
|
||||
|
||||
- For this to work, you will need to run `nltk.downloader.download('vader_lexicon')`. This could have been placed in the script, but then it would have been downloaded every time a query was executed on the `sentiment` table - which is not efficient
|
||||
- Each value of `row` is going to be a row in the result set of `SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20`
|
||||
- The incoming row is tab-separated, so we parse out the `id` and `comment` using the Python `split` function
|
||||
- The result of `polarity_scores` is a JSON object with a handful of values. We decided to just grab the `compound` value of this JSON object
|
||||
- Recall that the `sentiment` table in ClickHouse uses the `TabSeparated` format and contains two columns, so our `print` function separates those columns with a tab
|
||||
|
||||
Every time you write a query that selects rows from the `sentiment` table, the `SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20` query is executed and the result is passed to `sentiment.py`. Let's test it out:
|
||||
|
||||
```sql
|
||||
SELECT *
|
||||
FROM sentiment
|
||||
```
|
||||
|
||||
The response looks like:
|
||||
|
||||
```response
|
||||
┌───────id─┬─sentiment─┐
|
||||
│ 7398199 │ 0.4404 │
|
||||
│ 21640317 │ 0.1779 │
|
||||
│ 21462000 │ 0 │
|
||||
│ 25168863 │ 0 │
|
||||
│ 25168978 │ -0.1531 │
|
||||
│ 25169359 │ 0 │
|
||||
│ 25169394 │ -0.9231 │
|
||||
│ 25169766 │ 0.4137 │
|
||||
│ 25172570 │ 0.7469 │
|
||||
│ 25173687 │ 0.6249 │
|
||||
│ 28291534 │ 0 │
|
||||
│ 28291669 │ -0.4767 │
|
||||
│ 28291731 │ 0 │
|
||||
│ 28291949 │ -0.4767 │
|
||||
│ 28292004 │ 0.3612 │
|
||||
│ 28292050 │ -0.296 │
|
||||
│ 28292322 │ 0 │
|
||||
│ 28295172 │ 0.7717 │
|
||||
│ 28295288 │ 0.4404 │
|
||||
│ 21465723 │ -0.6956 │
|
||||
└──────────┴───────────┘
|
||||
```
|
||||
|
||||
|
||||
## Creating an ExecutablePool Table
|
||||
|
||||
The syntax for `ExecutablePool` is similar to `Executable`, but there are a couple of relevant settings unique to an `ExecutablePool` table:
|
||||
|
||||
- `pool_size`
|
||||
- Description: Processes pool size. If size is 0, then there are no size restrictions
|
||||
- Default value: 16
|
||||
- `max_command_execution_time`
|
||||
- Description: Max command execution time in seconds
|
||||
- Default value: 10
|
||||
|
||||
We can easily convert the `sentiment` table above to use `ExecutablePool` instead of `Executable`:
|
||||
|
||||
```sql
|
||||
CREATE TABLE sentiment_pooled (
|
||||
id UInt64,
|
||||
sentiment Float32
|
||||
)
|
||||
ENGINE = ExecutablePool(
|
||||
'sentiment.py',
|
||||
TabSeparated,
|
||||
(SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20000)
|
||||
)
|
||||
SETTINGS
|
||||
pool_size = 4;
|
||||
```
|
||||
|
||||
ClickHouse will maintain 4 processes on-demand when your client queries the `sentiment_pooled` table.
|
97
docs/en/sql-reference/table-functions/executable.md
Normal file
97
docs/en/sql-reference/table-functions/executable.md
Normal file
@ -0,0 +1,97 @@
|
||||
---
|
||||
slug: /en/engines/table-functions/executable
|
||||
sidebar_position: 55
|
||||
sidebar_label: executable
|
||||
keywords: [udf, user defined function, clickhouse, executable, table, function]
|
||||
---
|
||||
|
||||
# executable Table Function for UDFs
|
||||
|
||||
The `executable` table function creates a table based on the output of a user-defined function (UDF) that you define in a script that outputs rows to **stdout**. The executable script is stored in the `users_scripts` directory and can read data from any source.
|
||||
|
||||
You can optionally include one or more input queries that stream their results to **stdin** for the script to read.
|
||||
|
||||
:::note
|
||||
A key advantage between ordinary UDF functions and the `executable` table function and `Executable` table engine is that ordinary UDF functions cannot change the row count. For example, if the input is 100 rows, then the result must return 100 rows. When using the `executable` table function or `Executable` table engine, your script can make any data transformations you want, including complex aggregations.
|
||||
:::
|
||||
|
||||
## Syntax
|
||||
|
||||
The `executable` table function requires three parameters and accepts an optional list of input queries:
|
||||
|
||||
```sql
|
||||
executable(script_name, format, structure, [input_query...])
|
||||
```
|
||||
|
||||
- `script_name`: the file name of the script. saved in the `user_scripts` folder (the default folder of the `user_scripts_path` setting)
|
||||
- `format`: the format of the generated table
|
||||
- `structure`: the table schema of the generated table
|
||||
- `input_query`: an optional query (or collection or queries) whose results are passed to the script via **stdin**
|
||||
|
||||
:::note
|
||||
If you are going to invoke the same script repeatedly with the same input queries, consider using the [`Executable` table engine](../../engines/table-engines/special/executable.md).
|
||||
:::
|
||||
|
||||
The following Python script is named `generate_random.py` and is saved in the `user_scripts` folder. It reads in a number `i` and prints `i` random strings, with each string preceded by a number that is separated by a tab:
|
||||
|
||||
```python
|
||||
#!/usr/local/bin/python3.9
|
||||
|
||||
import sys
|
||||
import string
|
||||
import random
|
||||
|
||||
def main():
|
||||
|
||||
# Read input value
|
||||
for number in sys.stdin:
|
||||
i = int(number)
|
||||
|
||||
# Generate some random rows
|
||||
for id in range(0, i):
|
||||
letters = string.ascii_letters
|
||||
random_string = ''.join(random.choices(letters ,k=10))
|
||||
print(str(id) + '\t' + random_string + '\n', end='')
|
||||
|
||||
# Flush results to stdout
|
||||
sys.stdout.flush()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
```
|
||||
|
||||
Let's invoke the script and have it generate 10 random strings:
|
||||
|
||||
```sql
|
||||
SELECT * FROM executable('my_script.py', TabSeparated, 'id UInt32, random String', (SELECT 10))
|
||||
```
|
||||
|
||||
The response looks like:
|
||||
|
||||
```response
|
||||
┌─id─┬─random─────┐
|
||||
│ 0 │ xheXXCiSkH │
|
||||
│ 1 │ AqxvHAoTrl │
|
||||
│ 2 │ JYvPCEbIkY │
|
||||
│ 3 │ sWgnqJwGRm │
|
||||
│ 4 │ fTZGrjcLon │
|
||||
│ 5 │ ZQINGktPnd │
|
||||
│ 6 │ YFSvGGoezb │
|
||||
│ 7 │ QyMJJZOOia │
|
||||
│ 8 │ NfiyDDhmcI │
|
||||
│ 9 │ REJRdJpWrg │
|
||||
└────┴────────────┘
|
||||
```
|
||||
|
||||
## Passing Query Results to a Script
|
||||
|
||||
Be sure to check out the example in the `Executable` table engine on [how to pass query results to a script](../../engines/table-engines/special/executable#passing-query-results-to-a-script). Here is how you execute the same script in that example using the `executable` table function:
|
||||
|
||||
```sql
|
||||
SELECT * FROM executable(
|
||||
'sentiment.py',
|
||||
TabSeparated,
|
||||
'id UInt64, sentiment Float32',
|
||||
(SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20)
|
||||
);
|
||||
```
|
@ -953,7 +953,12 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
|
||||
{
|
||||
LOG_TRACE(log, "Will copy file {}", adjusted_path);
|
||||
|
||||
if (!num_entries)
|
||||
bool has_entries = false;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
has_entries = num_entries > 0;
|
||||
}
|
||||
if (!has_entries)
|
||||
checkLockFile(true);
|
||||
|
||||
if (use_archives)
|
||||
|
41
src/Disks/tests/gtest_azure_sdk.cpp
Normal file
41
src/Disks/tests/gtest_azure_sdk.cpp
Normal file
@ -0,0 +1,41 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <azure/storage/blobs.hpp>
|
||||
#include <azure/storage/common/internal/xml_wrapper.hpp>
|
||||
#include <azure/storage/blobs/blob_container_client.hpp>
|
||||
#include <azure/storage/blobs/blob_options.hpp>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
TEST(AzureXMLWrapper, TestLeak)
|
||||
{
|
||||
std::string str = "<hello>world</hello>";
|
||||
|
||||
Azure::Storage::_internal::XmlReader reader(str.c_str(), str.length());
|
||||
Azure::Storage::_internal::XmlReader reader2(std::move(reader));
|
||||
Azure::Storage::_internal::XmlReader reader3 = std::move(reader2);
|
||||
reader3.Read();
|
||||
}
|
||||
|
||||
TEST(AzureBlobContainerClient, CurlMemoryLeak)
|
||||
{
|
||||
using Azure::Storage::Blobs::BlobContainerClient;
|
||||
using Azure::Storage::Blobs::BlobClientOptions;
|
||||
|
||||
static constexpr auto unavailable_url = "http://unavailable:19999/bucket";
|
||||
static constexpr auto container = "container";
|
||||
|
||||
BlobClientOptions options;
|
||||
options.Retry.MaxRetries = 0;
|
||||
|
||||
auto client = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(unavailable_url, container, options));
|
||||
EXPECT_THROW({ client->ListBlobs(); }, Azure::Core::Http::TransportException);
|
||||
}
|
||||
|
||||
#endif
|
@ -1,25 +0,0 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <azure/storage/blobs.hpp>
|
||||
#include <azure/storage/common/internal/xml_wrapper.hpp>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
||||
TEST(AzureXMLWrapper, TestLeak)
|
||||
{
|
||||
std::string str = "<hello>world</hello>";
|
||||
|
||||
Azure::Storage::_internal::XmlReader reader(str.c_str(), str.length());
|
||||
Azure::Storage::_internal::XmlReader reader2(std::move(reader));
|
||||
Azure::Storage::_internal::XmlReader reader3 = std::move(reader2);
|
||||
reader3.Read();
|
||||
}
|
||||
|
||||
#endif
|
@ -124,6 +124,8 @@ size_t IntersectOrExceptTransform::buildFilter(
|
||||
|
||||
void IntersectOrExceptTransform::accumulate(Chunk chunk)
|
||||
{
|
||||
convertToFullIfSparse(chunk);
|
||||
|
||||
auto num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
@ -160,6 +162,8 @@ void IntersectOrExceptTransform::accumulate(Chunk chunk)
|
||||
|
||||
void IntersectOrExceptTransform::filter(Chunk & chunk)
|
||||
{
|
||||
convertToFullIfSparse(chunk);
|
||||
|
||||
auto num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
|
@ -1601,37 +1601,39 @@ void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_pa
|
||||
|
||||
void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &)
|
||||
{
|
||||
/// Asks to complete merges and does not allow them to start.
|
||||
/// This protects against "revival" of data for a removed partition after completion of merge.
|
||||
auto merge_blocker = stopMergesAndWait();
|
||||
waitForOutdatedPartsToBeLoaded();
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
auto txn = query_context->getCurrentTransaction();
|
||||
MergeTreeData::Transaction transaction(*this, txn.get());
|
||||
{
|
||||
auto operation_data_parts_lock = lockOperationsWithParts();
|
||||
/// Asks to complete merges and does not allow them to start.
|
||||
/// This protects against "revival" of data for a removed partition after completion of merge.
|
||||
auto merge_blocker = stopMergesAndWait();
|
||||
waitForOutdatedPartsToBeLoaded();
|
||||
|
||||
auto parts = getVisibleDataPartsVector(query_context);
|
||||
Stopwatch watch;
|
||||
|
||||
auto future_parts = initCoverageWithNewEmptyParts(parts);
|
||||
auto txn = query_context->getCurrentTransaction();
|
||||
MergeTreeData::Transaction transaction(*this, txn.get());
|
||||
{
|
||||
auto operation_data_parts_lock = lockOperationsWithParts();
|
||||
|
||||
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}",
|
||||
future_parts.size(), parts.size(),
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
|
||||
transaction.getTID());
|
||||
auto parts = getVisibleDataPartsVector(query_context);
|
||||
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
auto future_parts = initCoverageWithNewEmptyParts(parts);
|
||||
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}",
|
||||
future_parts.size(), parts.size(),
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
|
||||
transaction.getTID());
|
||||
|
||||
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
|
||||
LOG_INFO(log, "Truncated table with {} parts by replacing them with new empty {} parts. With txn {}",
|
||||
parts.size(), future_parts.size(),
|
||||
transaction.getTID());
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
|
||||
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
|
||||
|
||||
LOG_INFO(log, "Truncated table with {} parts by replacing them with new empty {} parts. With txn {}",
|
||||
parts.size(), future_parts.size(),
|
||||
transaction.getTID());
|
||||
}
|
||||
}
|
||||
|
||||
/// Old parts are needed to be destroyed before clearing them from filesystem.
|
||||
@ -1642,48 +1644,50 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
|
||||
|
||||
void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPtr query_context)
|
||||
{
|
||||
/// Asks to complete merges and does not allow them to start.
|
||||
/// This protects against "revival" of data for a removed partition after completion of merge.
|
||||
auto merge_blocker = stopMergesAndWait();
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
/// It's important to create it outside of lock scope because
|
||||
/// otherwise it can lock parts in destructor and deadlock is possible.
|
||||
auto txn = query_context->getCurrentTransaction();
|
||||
MergeTreeData::Transaction transaction(*this, txn.get());
|
||||
{
|
||||
auto operation_data_parts_lock = lockOperationsWithParts();
|
||||
/// Asks to complete merges and does not allow them to start.
|
||||
/// This protects against "revival" of data for a removed partition after completion of merge.
|
||||
auto merge_blocker = stopMergesAndWait();
|
||||
|
||||
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active});
|
||||
if (!part)
|
||||
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name);
|
||||
Stopwatch watch;
|
||||
|
||||
if (detach)
|
||||
/// It's important to create it outside of lock scope because
|
||||
/// otherwise it can lock parts in destructor and deadlock is possible.
|
||||
auto txn = query_context->getCurrentTransaction();
|
||||
MergeTreeData::Transaction transaction(*this, txn.get());
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
|
||||
part->makeCloneInDetached("", metadata_snapshot);
|
||||
}
|
||||
auto operation_data_parts_lock = lockOperationsWithParts();
|
||||
|
||||
{
|
||||
auto future_parts = initCoverageWithNewEmptyParts({part});
|
||||
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active});
|
||||
if (!part)
|
||||
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name);
|
||||
|
||||
LOG_TEST(log, "Made {} empty parts in order to cover {} part. With txn {}",
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames({part}), ", "),
|
||||
transaction.getTID());
|
||||
if (detach)
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
|
||||
part->makeCloneInDetached("", metadata_snapshot);
|
||||
}
|
||||
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
{
|
||||
auto future_parts = initCoverageWithNewEmptyParts({part});
|
||||
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
LOG_TEST(log, "Made {} empty parts in order to cover {} part. With txn {}",
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames({part}), ", "),
|
||||
transaction.getTID());
|
||||
|
||||
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
|
||||
const auto * op = detach ? "Detached" : "Dropped";
|
||||
LOG_INFO(log, "{} {} part by replacing it with new empty {} part. With txn {}",
|
||||
op, part->name, future_parts[0].part_name,
|
||||
transaction.getTID());
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
|
||||
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
|
||||
|
||||
const auto * op = detach ? "Detached" : "Dropped";
|
||||
LOG_INFO(log, "{} {} part by replacing it with new empty {} part. With txn {}",
|
||||
op, part->name, future_parts[0].part_name,
|
||||
transaction.getTID());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1695,58 +1699,60 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
|
||||
|
||||
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context)
|
||||
{
|
||||
const auto * partition_ast = partition->as<ASTPartition>();
|
||||
|
||||
/// Asks to complete merges and does not allow them to start.
|
||||
/// This protects against "revival" of data for a removed partition after completion of merge.
|
||||
auto merge_blocker = stopMergesAndWait();
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
/// It's important to create it outside of lock scope because
|
||||
/// otherwise it can lock parts in destructor and deadlock is possible.
|
||||
auto txn = query_context->getCurrentTransaction();
|
||||
MergeTreeData::Transaction transaction(*this, txn.get());
|
||||
{
|
||||
auto operation_data_parts_lock = lockOperationsWithParts();
|
||||
const auto * partition_ast = partition->as<ASTPartition>();
|
||||
|
||||
DataPartsVector parts;
|
||||
/// Asks to complete merges and does not allow them to start.
|
||||
/// This protects against "revival" of data for a removed partition after completion of merge.
|
||||
auto merge_blocker = stopMergesAndWait();
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
/// It's important to create it outside of lock scope because
|
||||
/// otherwise it can lock parts in destructor and deadlock is possible.
|
||||
auto txn = query_context->getCurrentTransaction();
|
||||
MergeTreeData::Transaction transaction(*this, txn.get());
|
||||
{
|
||||
if (partition_ast && partition_ast->all)
|
||||
parts = getVisibleDataPartsVector(query_context);
|
||||
else
|
||||
auto operation_data_parts_lock = lockOperationsWithParts();
|
||||
|
||||
DataPartsVector parts;
|
||||
{
|
||||
String partition_id = getPartitionIDFromQuery(partition, query_context);
|
||||
parts = getVisibleDataPartsVectorInPartition(query_context, partition_id);
|
||||
if (partition_ast && partition_ast->all)
|
||||
parts = getVisibleDataPartsVector(query_context);
|
||||
else
|
||||
{
|
||||
String partition_id = getPartitionIDFromQuery(partition, query_context);
|
||||
parts = getVisibleDataPartsVectorInPartition(query_context, partition_id);
|
||||
}
|
||||
}
|
||||
|
||||
if (detach)
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
|
||||
part->makeCloneInDetached("", metadata_snapshot);
|
||||
}
|
||||
|
||||
auto future_parts = initCoverageWithNewEmptyParts(parts);
|
||||
|
||||
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}",
|
||||
future_parts.size(), parts.size(),
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
|
||||
transaction.getTID());
|
||||
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
|
||||
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
|
||||
|
||||
const auto * op = detach ? "Detached" : "Dropped";
|
||||
LOG_INFO(log, "{} partition with {} parts by replacing them with new empty {} parts. With txn {}",
|
||||
op, parts.size(), future_parts.size(),
|
||||
transaction.getTID());
|
||||
}
|
||||
|
||||
if (detach)
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
|
||||
part->makeCloneInDetached("", metadata_snapshot);
|
||||
}
|
||||
|
||||
auto future_parts = initCoverageWithNewEmptyParts(parts);
|
||||
|
||||
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}",
|
||||
future_parts.size(), parts.size(),
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
|
||||
transaction.getTID());
|
||||
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
|
||||
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
|
||||
|
||||
const auto * op = detach ? "Detached" : "Dropped";
|
||||
LOG_INFO(log, "{} partition with {} parts by replacing them with new empty {} parts. With txn {}",
|
||||
op, parts.size(), future_parts.size(),
|
||||
transaction.getTID());
|
||||
}
|
||||
|
||||
/// Old parts are needed to be destroyed before clearing them from filesystem.
|
||||
|
@ -0,0 +1,2 @@
|
||||
0
|
||||
2000
|
14
tests/queries/0_stateless/02552_sparse_columns_intersect.sql
Normal file
14
tests/queries/0_stateless/02552_sparse_columns_intersect.sql
Normal file
@ -0,0 +1,14 @@
|
||||
DROP TABLE IF EXISTS t_sparse_intersect;
|
||||
|
||||
CREATE TABLE t_sparse_intersect (a UInt64, c Int64) ENGINE = MergeTree
|
||||
ORDER BY tuple() SETTINGS ratio_of_defaults_for_sparse_serialization = 0.8;
|
||||
|
||||
SYSTEM STOP MERGES t_sparse_intersect;
|
||||
|
||||
INSERT INTO t_sparse_intersect SELECT if (number % 10 = 0, number, 0), number FROM numbers(1000);
|
||||
INSERT INTO t_sparse_intersect SELECT number, number FROM numbers(1000);
|
||||
|
||||
SELECT count() FROM (SELECT * FROM t_sparse_intersect EXCEPT SELECT * FROM t_sparse_intersect);
|
||||
SELECT count() FROM (SELECT * FROM t_sparse_intersect INTERSECT SELECT * FROM t_sparse_intersect);
|
||||
|
||||
DROP TABLE t_sparse_intersect;
|
Loading…
Reference in New Issue
Block a user