Merge branch 'ClickHouse:master' into 56257_parse_crlf_with_TSV_files

This commit is contained in:
Shaun Struwig 2024-05-15 05:28:18 +02:00 committed by GitHub
commit 47ab2e2dc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 884 additions and 179 deletions

View File

@ -85,4 +85,4 @@ At a minimum, the following information should be added (but add more as needed)
- [ ] <!---batch_2--> 3
- [ ] <!---batch_3--> 4
<details>
</details>

View File

@ -9,6 +9,12 @@ on: # yamllint disable-line rule:truthy
push:
branches:
- 'backport/**'
# Cancel the previous wf run in PRs.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]

View File

@ -1,19 +0,0 @@
name: Cancel
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
workflow_run:
workflows: ["PullRequestCI", "ReleaseBranchCI", "DocsCheck", "BackportPR"]
types:
- requested
jobs:
cancel:
runs-on: [self-hosted, style-checker]
steps:
- uses: styfle/cancel-workflow-action@0.9.1
with:
all_but_latest: true
workflow_id: ${{ github.event.workflow.id }}

View File

@ -1,11 +0,0 @@
# The CI for each commit, prints envs and content of GITHUB_EVENT_PATH
name: Debug
'on':
[push, pull_request, pull_request_review, release, workflow_dispatch, workflow_call]
jobs:
DebugInfo:
runs-on: ubuntu-latest
steps:
- uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6

View File

@ -10,14 +10,13 @@ env:
workflow_dispatch:
jobs:
Debug:
# The task for having a preserved ENV and event.json for later investigation
uses: ./.github/workflows/debug.yml
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:

View File

@ -14,6 +14,11 @@ on: # yamllint disable-line rule:truthy
branches:
- master
# Cancel the previous wf run in PRs.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]

View File

@ -1,23 +0,0 @@
name: PullRequestApprovedCI
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
pull_request_review:
types:
- submitted
jobs:
MergeOnApproval:
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Merge approved PR
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 merge_pr.py --check-approved

View File

@ -75,7 +75,7 @@ The supported formats are:
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✔ |
| [One](#data-format-one) | ✔ | ✗ |
| [Npy](#data-format-npy) | ✔ | |
| [Npy](#data-format-npy) | ✔ | |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
@ -2467,23 +2467,22 @@ Result:
## Npy {#data-format-npy}
This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
| Npy type | ClickHouse type |
|:--------:|:---------------:|
| b1 | UInt8 |
| i1 | Int8 |
| i2 | Int16 |
| i4 | Int32 |
| i8 | Int64 |
| u1 | UInt8 |
| u2 | UInt16 |
| u4 | UInt32 |
| u8 | UInt64 |
| f2 | Float32 |
| f4 | Float32 |
| f8 | Float64 |
| S | String |
| U | String |
This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
| Npy data type (`INSERT`) | ClickHouse data type | Npy data type (`SELECT`) |
|--------------------------|-----------------------------------------------------------------|--------------------------|
| `i1` | [Int8](/docs/en/sql-reference/data-types/int-uint.md) | `i1` |
| `i2` | [Int16](/docs/en/sql-reference/data-types/int-uint.md) | `i2` |
| `i4` | [Int32](/docs/en/sql-reference/data-types/int-uint.md) | `i4` |
| `i8` | [Int64](/docs/en/sql-reference/data-types/int-uint.md) | `i8` |
| `u1`, `b1` | [UInt8](/docs/en/sql-reference/data-types/int-uint.md) | `u1` |
| `u2` | [UInt16](/docs/en/sql-reference/data-types/int-uint.md) | `u2` |
| `u4` | [UInt32](/docs/en/sql-reference/data-types/int-uint.md) | `u4` |
| `u8` | [UInt64](/docs/en/sql-reference/data-types/int-uint.md) | `u8` |
| `f2`, `f4` | [Float32](/docs/en/sql-reference/data-types/float.md) | `f4` |
| `f8` | [Float64](/docs/en/sql-reference/data-types/float.md) | `f8` |
| `S`, `U` | [String](/docs/en/sql-reference/data-types/string.md) | `S` |
| | [FixedString](/docs/en/sql-reference/data-types/fixedstring.md) | `S` |
**Example of saving an array in .npy format using Python**
@ -2510,6 +2509,14 @@ Result:
└───────────────┘
```
**Selecting Data**
You can select data from a ClickHouse table and save them into some file in the Npy format by the following command:
```bash
$ clickhouse-client --query="SELECT {column} FROM {some_table} FORMAT Npy" > {filename.npy}
```
## LineAsString {#lineasstring}
In this format, every line of input data is interpreted as a single string value. This format can only be parsed for table with a single field of type [String](/docs/en/sql-reference/data-types/string.md). The remaining columns must be set to [DEFAULT](/docs/en/sql-reference/statements/create/table.md/#default) or [MATERIALIZED](/docs/en/sql-reference/statements/create/table.md/#materialized), or omitted.

View File

@ -2329,7 +2329,7 @@ void QueryAnalyzer::replaceNodesWithPositionalArguments(QueryTreeNodePtr & node_
pos = value;
else
{
if (static_cast<size_t>(std::abs(value)) > projection_nodes.size())
if (value < -static_cast<Int64>(projection_nodes.size()))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Negative positional argument number {} is out of bounds. Expected in range [-{}, -1]. In scope {}",
@ -4610,7 +4610,7 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveUnqualifiedMatcher(
for (auto & table_expression : table_expressions_stack)
{
bool table_expression_in_resolve_process = scope.table_expressions_in_resolve_process.contains(table_expression.get());
bool table_expression_in_resolve_process = nearest_query_scope->table_expressions_in_resolve_process.contains(table_expression.get());
if (auto * array_join_node = table_expression->as<ArrayJoinNode>())
{
@ -8059,7 +8059,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
window_node_typed.setParentWindowName({});
}
scope.window_name_to_window_node.emplace(window_node_typed.getAlias(), window_node);
auto [_, inserted] = scope.window_name_to_window_node.emplace(window_node_typed.getAlias(), window_node);
if (!inserted)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Window '{}' is already defined. In scope {}",
window_node_typed.getAlias(),
scope.scope_node->formatASTForErrorMessage());
}
/** Disable identifier cache during JOIN TREE resolve.

View File

@ -1,7 +1,6 @@
#pragma once
#include <cstring>
#include <cassert>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
@ -12,6 +11,8 @@
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <base/defines.h>
class Collator;
@ -42,7 +43,11 @@ private:
size_t ALWAYS_INLINE offsetAt(ssize_t i) const { return offsets[i - 1]; }
/// Size of i-th element, including terminating zero.
size_t ALWAYS_INLINE sizeAt(ssize_t i) const { return offsets[i] - offsets[i - 1]; }
size_t ALWAYS_INLINE sizeAt(ssize_t i) const
{
chassert(offsets[i] > offsets[i - 1]);
return offsets[i] - offsets[i - 1];
}
struct ComparatorBase;
@ -79,7 +84,7 @@ public:
size_t byteSizeAt(size_t n) const override
{
assert(n < size());
chassert(n < size());
return sizeAt(n) + sizeof(offsets[0]);
}
@ -94,25 +99,25 @@ public:
Field operator[](size_t n) const override
{
assert(n < size());
chassert(n < size());
return Field(&chars[offsetAt(n)], sizeAt(n) - 1);
}
void get(size_t n, Field & res) const override
{
assert(n < size());
chassert(n < size());
res = std::string_view{reinterpret_cast<const char *>(&chars[offsetAt(n)]), sizeAt(n) - 1};
}
StringRef getDataAt(size_t n) const override
{
assert(n < size());
chassert(n < size());
return StringRef(&chars[offsetAt(n)], sizeAt(n) - 1);
}
bool isDefaultAt(size_t n) const override
{
assert(n < size());
chassert(n < size());
return sizeAt(n) == 1;
}

View File

@ -21,5 +21,8 @@ template class ColumnUnique<ColumnFloat64>;
template class ColumnUnique<ColumnString>;
template class ColumnUnique<ColumnFixedString>;
template class ColumnUnique<ColumnDateTime64>;
template class ColumnUnique<ColumnIPv4>;
template class ColumnUnique<ColumnIPv6>;
template class ColumnUnique<ColumnUUID>;
}

View File

@ -57,7 +57,7 @@ namespace DB
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
M(UInt64, cgroups_memory_usage_observer_wait_time, 15, "Polling interval in seconds to read the current memory usage from cgroups. Zero means disabled.", 0) \
M(Double, cgroup_memory_watcher_hard_limit_ratio, 0.95, "Hard memory limit ratio for cgroup memory usage observer", 0) \
M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Sort memory limit ratio limit for cgroup memory usage observer", 0) \
M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Soft memory limit ratio limit for cgroup memory usage observer", 0) \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \

View File

@ -1,10 +1,12 @@
#pragma once
#include <cstddef>
#include <Storages/NamedCollectionsHelpers.h>
#include <IO/WriteBufferFromString.h>
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
}
enum class NumpyDataTypeIndex : uint8_t
@ -29,9 +31,9 @@ class NumpyDataType
public:
enum Endianness
{
LITTLE,
BIG,
NONE,
LITTLE = '<',
BIG = '>',
NONE = '|',
};
NumpyDataTypeIndex type_index;
@ -41,15 +43,18 @@ public:
Endianness getEndianness() const { return endianness; }
virtual NumpyDataTypeIndex getTypeIndex() const = 0;
virtual size_t getSize() const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function getSize() is not implemented"); }
virtual void setSize(size_t) { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function setSize() is not implemented"); }
virtual String str() const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function str() is not implemented"); }
private:
protected:
Endianness endianness;
};
class NumpyDataTypeInt : public NumpyDataType
{
public:
NumpyDataTypeInt(Endianness endianness, size_t size_, bool is_signed_) : NumpyDataType(endianness), size(size_), is_signed(is_signed_)
NumpyDataTypeInt(Endianness endianness_, size_t size_, bool is_signed_) : NumpyDataType(endianness_), size(size_), is_signed(is_signed_)
{
switch (size)
{
@ -67,6 +72,14 @@ public:
return type_index;
}
bool isSigned() const { return is_signed; }
String str() const override
{
DB::WriteBufferFromOwnString buf;
writeChar(static_cast<char>(endianness), buf);
writeChar(is_signed ? 'i' : 'u', buf);
writeIntText(size, buf);
return buf.str();
}
private:
size_t size;
@ -76,7 +89,7 @@ private:
class NumpyDataTypeFloat : public NumpyDataType
{
public:
NumpyDataTypeFloat(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
NumpyDataTypeFloat(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
switch (size)
{
@ -92,6 +105,14 @@ public:
{
return type_index;
}
String str() const override
{
DB::WriteBufferFromOwnString buf;
writeChar(static_cast<char>(endianness), buf);
writeChar('f', buf);
writeIntText(size, buf);
return buf.str();
}
private:
size_t size;
};
@ -99,13 +120,22 @@ private:
class NumpyDataTypeString : public NumpyDataType
{
public:
NumpyDataTypeString(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
NumpyDataTypeString(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
type_index = NumpyDataTypeIndex::String;
}
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
size_t getSize() const { return size; }
size_t getSize() const override { return size; }
void setSize(size_t size_) override { size = size_; }
String str() const override
{
DB::WriteBufferFromOwnString buf;
writeChar(static_cast<char>(endianness), buf);
writeChar('S', buf);
writeIntText(size, buf);
return buf.str();
}
private:
size_t size;
};
@ -113,13 +143,13 @@ private:
class NumpyDataTypeUnicode : public NumpyDataType
{
public:
NumpyDataTypeUnicode(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
NumpyDataTypeUnicode(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
type_index = NumpyDataTypeIndex::Unicode;
}
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
size_t getSize() const { return size * 4; }
size_t getSize() const override { return size * 4; }
private:
size_t size;
};

View File

@ -76,6 +76,8 @@ void registerInputFormatCustomSeparated(FormatFactory & factory);
void registerOutputFormatCustomSeparated(FormatFactory & factory);
void registerInputFormatCapnProto(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
void registerInputFormatNpy(FormatFactory & factory);
void registerOutputFormatNpy(FormatFactory & factory);
void registerInputFormatForm(FormatFactory & factory);
/// Output only (presentational) formats.
@ -104,7 +106,6 @@ void registerInputFormatMySQLDump(FormatFactory & factory);
void registerInputFormatParquetMetadata(FormatFactory & factory);
void registerInputFormatDWARF(FormatFactory & factory);
void registerInputFormatOne(FormatFactory & factory);
void registerInputFormatNpy(FormatFactory & factory);
#if USE_HIVE
void registerInputFormatHiveText(FormatFactory & factory);
@ -224,6 +225,8 @@ void registerFormats()
registerOutputFormatAvro(factory);
registerInputFormatArrow(factory);
registerOutputFormatArrow(factory);
registerInputFormatNpy(factory);
registerOutputFormatNpy(factory);
registerOutputFormatPretty(factory);
registerOutputFormatPrettyCompact(factory);
@ -254,7 +257,6 @@ void registerFormats()
registerInputFormatParquetMetadata(factory);
registerInputFormatDWARF(factory);
registerInputFormatOne(factory);
registerInputFormatNpy(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);

View File

@ -35,10 +35,17 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/formatReadable.h>
#include "Core/Joins.h"
#include "Interpreters/TemporaryDataOnDisk.h"
#include <Functions/FunctionHelpers.h>
#include <Interpreters/castColumn.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForJoin;
}
namespace DB
{
@ -63,6 +70,7 @@ struct NotProcessedCrossJoin : public ExtraBlock
{
size_t left_position;
size_t right_block;
std::unique_ptr<TemporaryFileStream::Reader> reader;
};
@ -249,6 +257,10 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, instance_id(instance_id_)
, asof_inequality(table_join->getAsofInequality())
, data(std::make_shared<RightTableData>())
, tmp_data(
table_join_->getTempDataOnDisk()
? std::make_unique<TemporaryDataOnDisk>(table_join_->getTempDataOnDisk(), CurrentMetrics::TemporaryFilesForJoin)
: nullptr)
, right_sample_block(right_sample_block_)
, max_joined_block_rows(table_join->maxJoinedBlockRows())
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
@ -827,6 +839,21 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
size_t max_bytes_in_join = table_join->sizeLimits().max_bytes;
size_t max_rows_in_join = table_join->sizeLimits().max_rows;
if (kind == JoinKind::Cross && tmp_data
&& (tmp_stream || (max_bytes_in_join && getTotalByteCount() + block_to_save.allocatedBytes() >= max_bytes_in_join)
|| (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
{
if (tmp_stream == nullptr)
{
tmp_stream = &tmp_data->createStream(right_sample_block);
}
tmp_stream->write(block_to_save);
return true;
}
size_t total_rows = 0;
size_t total_bytes = 0;
{
@ -944,7 +971,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
shrinkStoredBlocksToFit(total_bytes);
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
@ -2238,11 +2264,13 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
{
size_t start_left_row = 0;
size_t start_right_block = 0;
std::unique_ptr<TemporaryFileStream::Reader> reader = nullptr;
if (not_processed)
{
auto & continuation = static_cast<NotProcessedCrossJoin &>(*not_processed);
start_left_row = continuation.left_position;
start_right_block = continuation.right_block;
reader = std::move(continuation.reader);
not_processed.reset();
}
@ -2271,18 +2299,12 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
size_t rows_left = block.rows();
size_t rows_added = 0;
for (size_t left_row = start_left_row; left_row < rows_left; ++left_row)
{
size_t block_number = 0;
for (const Block & compressed_block_right : data->blocks)
auto process_right_block = [&](const Block & block_right)
{
++block_number;
if (block_number < start_right_block)
continue;
auto block_right = compressed_block_right.decompress();
size_t rows_right = block_right.rows();
rows_added += rows_right;
@ -2294,6 +2316,44 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
const IColumn & column_right = *block_right.getByPosition(col_num).column;
dst_columns[num_existing_columns + col_num]->insertRangeFrom(column_right, 0, rows_right);
}
};
for (const Block & compressed_block_right : data->blocks)
{
++block_number;
if (block_number < start_right_block)
continue;
auto block_right = compressed_block_right.decompress();
process_right_block(block_right);
if (rows_added > max_joined_block_rows)
{
break;
}
}
if (tmp_stream && rows_added <= max_joined_block_rows)
{
if (reader == nullptr)
{
tmp_stream->finishWritingAsyncSafe();
reader = tmp_stream->getReadStream();
}
while (auto block_right = reader->read())
{
++block_number;
process_right_block(block_right);
if (rows_added > max_joined_block_rows)
{
break;
}
}
/// It means, that reader->read() returned {}
if (rows_added <= max_joined_block_rows)
{
reader.reset();
}
}
start_right_block = 0;
@ -2301,7 +2361,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
if (rows_added > max_joined_block_rows)
{
not_processed = std::make_shared<NotProcessedCrossJoin>(
NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1});
NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1, std::move(reader)});
not_processed->block.swap(block);
break;
}

View File

@ -26,6 +26,7 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/IKeyValueEntity.h>
#include <Interpreters/TemporaryDataOnDisk.h>
namespace DB
{
@ -442,6 +443,10 @@ private:
RightTableDataPtr data;
std::vector<Sizes> key_sizes;
/// Needed to do external cross join
TemporaryDataOnDiskPtr tmp_data;
TemporaryFileStream* tmp_stream{nullptr};
/// Block with columns from the right-side table.
Block right_sample_block;
/// Block with columns from the right-side table except key columns.

View File

@ -310,7 +310,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
auto settings = context->getSettingsRef();
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
bool try_use_direct_join = join_algorithm.isSet(JoinAlgorithm::DIRECT) || join_algorithm.isSet(JoinAlgorithm::DEFAULT);
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume());
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume(), context->getTempDataOnDisk());
const ASTTablesInSelectQueryElement * ast_join = select_query_.join();
const auto & table_to_join = ast_join->table_expression->as<ASTTableExpression &>();

View File

@ -103,7 +103,7 @@ bool forAllKeys(OnExpr & expressions, Func callback)
}
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)
@ -117,6 +117,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
, temporary_files_codec(settings.temporary_files_codec)
, max_memory_usage(settings.max_memory_usage)
, tmp_volume(tmp_volume_)
, tmp_data(tmp_data_)
{
}

View File

@ -9,6 +9,7 @@
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/getLeastSupertype.h>
#include <Interpreters/IKeyValueEntity.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/Exception.h>
#include <Parsers/IAST_fwd.h>
@ -188,6 +189,8 @@ private:
VolumePtr tmp_volume;
TemporaryDataOnDiskScopePtr tmp_data;
std::shared_ptr<StorageJoin> right_storage_join;
std::shared_ptr<const IKeyValueEntity> right_kv_storage;
@ -233,7 +236,7 @@ private:
public:
TableJoin() = default;
TableJoin(const Settings & settings, VolumePtr tmp_volume_);
TableJoin(const Settings & settings, VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_);
/// for StorageJoin
TableJoin(SizeLimits limits, bool use_nulls, JoinKind kind, JoinStrictness strictness,
@ -259,6 +262,8 @@ public:
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }
TemporaryDataOnDiskScopePtr getTempDataOnDisk() { return tmp_data; }
ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;
const std::vector<JoinAlgorithm> & getEnabledJoinAlgorithms() const { return join_algorithm; }

View File

@ -1,12 +1,11 @@
#include <atomic>
#include <mutex>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Formats/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Core/ProtocolDefines.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/DiskLocal.h>
@ -14,6 +13,7 @@
#include <Core/Defines.h>
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
#include "Common/Exception.h"
namespace ProfileEvents
{
@ -224,33 +224,26 @@ struct TemporaryFileStream::OutputWriter
bool finalized = false;
};
struct TemporaryFileStream::InputReader
TemporaryFileStream::Reader::Reader(const String & path, const Block & header_, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
{
InputReader(const String & path, const Block & header_, size_t size = 0)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
}
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
}
explicit InputReader(const String & path, size_t size = 0)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
}
TemporaryFileStream::Reader::Reader(const String & path, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
}
Block read()
{
return in_reader.read();
}
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
};
Block TemporaryFileStream::Reader::read()
{
return in_reader.read();
}
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
@ -310,6 +303,12 @@ TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
return stat;
}
TemporaryFileStream::Stat TemporaryFileStream::finishWritingAsyncSafe()
{
std::call_once(finish_writing, [this]{ finishWriting(); });
return stat;
}
bool TemporaryFileStream::isWriteFinished() const
{
assert(in_reader == nullptr || out_writer == nullptr);
@ -326,7 +325,7 @@ Block TemporaryFileStream::read()
if (!in_reader)
{
in_reader = std::make_unique<InputReader>(getPath(), header, getSize());
in_reader = std::make_unique<Reader>(getPath(), header, getSize());
}
Block block = in_reader->read();
@ -338,6 +337,17 @@ Block TemporaryFileStream::read()
return block;
}
std::unique_ptr<TemporaryFileStream::Reader> TemporaryFileStream::getReadStream()
{
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isEof())
return nullptr;
return std::make_unique<Reader>(getPath(), header, getSize());
}
void TemporaryFileStream::updateAllocAndCheck()
{
assert(out_writer);

View File

@ -1,7 +1,12 @@
#pragma once
#include <atomic>
#include <mutex>
#include <boost/noncopyable.hpp>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Formats/NativeReader.h>
#include <Core/Block.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
@ -132,12 +137,25 @@ private:
/*
* Data can be written into this stream and then read.
* After finish writing, call `finishWriting` and then `read` to read the data.
* After finish writing, call `finishWriting` and then either call `read` or 'getReadStream'(only one of the two) to read the data.
* Account amount of data written to disk in parent scope.
*/
class TemporaryFileStream : boost::noncopyable
{
public:
struct Reader
{
Reader(const String & path, const Block & header_, size_t size = 0);
explicit Reader(const String & path, size_t size = 0);
Block read();
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
};
struct Stat
{
/// Statistics for file
@ -154,8 +172,11 @@ public:
void flush();
Stat finishWriting();
Stat finishWritingAsyncSafe();
bool isWriteFinished() const;
std::unique_ptr<Reader> getReadStream();
Block read();
String getPath() const;
@ -184,11 +205,12 @@ private:
Stat stat;
std::once_flag finish_writing;
struct OutputWriter;
std::unique_ptr<OutputWriter> out_writer;
struct InputReader;
std::unique_ptr<InputReader> in_reader;
std::unique_ptr<Reader> in_reader;
};
}

View File

@ -51,7 +51,7 @@ public:
{
/// We allow to not hide type of the disk, e.g. disk(type = s3, ...)
/// and also nested disk, e.g. disk = 'disk_name'
return arg_name != "type" && arg_name != "disk";
return arg_name != "type" && arg_name != "disk" && arg_name != "name" ;
};
for (const auto & arg : disk_function_args)

View File

@ -1207,7 +1207,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
}
}
auto table_join = std::make_shared<TableJoin>(settings, query_context->getGlobalTemporaryVolume());
auto table_join = std::make_shared<TableJoin>(settings, query_context->getGlobalTemporaryVolume(), query_context->getTempDataOnDisk());
table_join->getTableJoin() = join_node.toASTTableJoin()->as<ASTTableJoin &>();
if (join_constant)

View File

@ -0,0 +1,269 @@
#include <Processors/Formats/Impl/NpyOutputFormat.h>
#include <Core/TypeId.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnArray.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatFactory.h>
#include <Common/assert_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_COLUMNS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_COLUMN;
}
namespace
{
template <typename ColumnType, typename ValueType>
void writeNumpyNumbers(const ColumnPtr & column, WriteBuffer & buf)
{
const auto * number_column = assert_cast<const ColumnType *>(column.get());
for (size_t i = 0; i < number_column->size(); ++i)
writeBinaryLittleEndian(ValueType(number_column->getElement(i)), buf);
}
template <typename ColumnType>
void writeNumpyStrings(const ColumnPtr & column, size_t length, WriteBuffer & buf)
{
const auto * string_column = assert_cast<const ColumnType *>(column.get());
for (size_t i = 0; i < string_column->size(); ++i)
{
auto data = string_column->getDataAt(i);
buf.write(data.data, data.size);
writeChar(0, length - data.size, buf);
}
}
}
String NpyOutputFormat::shapeStr() const
{
WriteBufferFromOwnString shape;
writeIntText(num_rows, shape);
writeChar(',', shape);
for (UInt64 dim : numpy_shape)
{
writeIntText(dim, shape);
writeChar(',', shape);
}
return shape.str();
}
NpyOutputFormat::NpyOutputFormat(WriteBuffer & out_, const Block & header_) : IOutputFormat(header_, out_)
{
const auto & header = getPort(PortKind::Main).getHeader();
auto data_types = header.getDataTypes();
if (data_types.size() > 1)
throw Exception(ErrorCodes::TOO_MANY_COLUMNS, "Expected single column for Npy output format, got {}", data_types.size());
data_type = data_types[0];
if (!getNumpyDataType(data_type))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type {} is not supported for Npy output format", nested_data_type->getName());
}
bool NpyOutputFormat::getNumpyDataType(const DataTypePtr & type)
{
switch (type->getTypeId())
{
case TypeIndex::Int8:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int8), true);
break;
case TypeIndex::Int16:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int16), true);
break;
case TypeIndex::Int32:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int32), true);
break;
case TypeIndex::Int64:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int64), true);
break;
case TypeIndex::UInt8:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt8), false);
break;
case TypeIndex::UInt16:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt16), false);
break;
case TypeIndex::UInt32:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt32), false);
break;
case TypeIndex::UInt64:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt64), false);
break;
case TypeIndex::Float32:
numpy_data_type = std::make_shared<NumpyDataTypeFloat>(NumpyDataType::Endianness::LITTLE, sizeof(Float32));
break;
case TypeIndex::Float64:
numpy_data_type = std::make_shared<NumpyDataTypeFloat>(NumpyDataType::Endianness::LITTLE, sizeof(Float64));
break;
case TypeIndex::FixedString:
numpy_data_type = std::make_shared<NumpyDataTypeString>(
NumpyDataType::Endianness::NONE, assert_cast<const DataTypeFixedString *>(type.get())->getN());
break;
case TypeIndex::String:
numpy_data_type = std::make_shared<NumpyDataTypeString>(NumpyDataType::Endianness::NONE, 0);
break;
case TypeIndex::Array:
return getNumpyDataType(assert_cast<const DataTypeArray *>(type.get())->getNestedType());
default:
nested_data_type = type;
return false;
}
nested_data_type = type;
return true;
}
void NpyOutputFormat::consume(Chunk chunk)
{
if (!invalid_shape)
{
num_rows += chunk.getNumRows();
const auto & column = chunk.getColumns()[0];
if (!is_initialized)
{
initShape(column);
is_initialized = true;
}
ColumnPtr nested_column = column;
checkShape(nested_column);
updateSizeIfTypeString(nested_column);
columns.push_back(nested_column);
}
}
void NpyOutputFormat::initShape(const ColumnPtr & column)
{
ColumnPtr nested_column = column;
while (const auto * array_column = typeid_cast<const ColumnArray *>(nested_column.get()))
{
auto dim = array_column->getOffsets()[0];
invalid_shape = dim == 0;
numpy_shape.push_back(dim);
nested_column = array_column->getDataPtr();
}
if (invalid_shape)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Shape ({}) is invalid, as dimension size cannot be 0", shapeStr());
}
void NpyOutputFormat::checkShape(ColumnPtr & column)
{
int dim = 0;
while (const auto * array_column = typeid_cast<const ColumnArray *>(column.get()))
{
const auto & array_offset = array_column->getOffsets();
for (size_t i = 0; i < array_offset.size(); ++i)
if (array_offset[i] - array_offset[i - 1] != numpy_shape[dim])
{
invalid_shape = true;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "ClickHouse doesn't support object types, cannot format ragged nested sequences (which is a list of arrays with different shapes)");
}
column = array_column->getDataPtr();
dim += 1;
}
}
void NpyOutputFormat::updateSizeIfTypeString(const ColumnPtr & column)
{
if (nested_data_type->getTypeId() == TypeIndex::String)
{
const auto & string_offsets = assert_cast<const ColumnString *>(column.get())->getOffsets();
for (size_t i = 0; i < string_offsets.size(); ++i)
{
size_t string_length = static_cast<size_t>(string_offsets[i] - 1 - string_offsets[i - 1]);
if (numpy_data_type->getSize() < string_length)
numpy_data_type->setSize(string_length);
}
}
}
void NpyOutputFormat::finalizeImpl()
{
if (!invalid_shape)
{
writeHeader();
writeColumns();
}
}
void NpyOutputFormat::writeHeader()
{
String dict = "{'descr':'" + numpy_data_type->str() + "','fortran_order':False,'shape':(" + shapeStr() + "),}";
String padding = "\n";
/// completes the length of the header, which is divisible by 64.
size_t dict_length = dict.length() + 1;
size_t header_length = STATIC_HEADER_LENGTH + sizeof(UInt32) + dict_length;
if (header_length % 64)
{
header_length = ((header_length / 64) + 1) * 64;
dict_length = header_length - STATIC_HEADER_LENGTH - sizeof(UInt32);
padding = std::string(dict_length - dict.length(), '\x20');
padding.back() = '\n';
}
out.write(STATIC_HEADER, STATIC_HEADER_LENGTH);
writeBinaryLittleEndian(static_cast<UInt32>(dict_length), out);
out.write(dict.data(), dict.length());
out.write(padding.data(), padding.length());
}
void NpyOutputFormat::writeColumns()
{
for (const auto & column : columns)
{
switch (nested_data_type->getTypeId())
{
case TypeIndex::Int8: writeNumpyNumbers<ColumnInt8, Int8>(column, out); break;
case TypeIndex::Int16: writeNumpyNumbers<ColumnInt16, Int16>(column, out); break;
case TypeIndex::Int32: writeNumpyNumbers<ColumnInt32, Int32>(column, out); break;
case TypeIndex::Int64: writeNumpyNumbers<ColumnInt64, Int64>(column, out); break;
case TypeIndex::UInt8: writeNumpyNumbers<ColumnUInt8, UInt8>(column, out); break;
case TypeIndex::UInt16: writeNumpyNumbers<ColumnUInt16, UInt16>(column, out); break;
case TypeIndex::UInt32: writeNumpyNumbers<ColumnUInt32, UInt32>(column, out); break;
case TypeIndex::UInt64: writeNumpyNumbers<ColumnUInt64, UInt64>(column, out); break;
case TypeIndex::Float32: writeNumpyNumbers<ColumnFloat32, Float32>(column, out); break;
case TypeIndex::Float64: writeNumpyNumbers<ColumnFloat64, Float64>(column, out); break;
case TypeIndex::FixedString:
writeNumpyStrings<ColumnFixedString>(column, numpy_data_type->getSize(), out);
break;
case TypeIndex::String:
writeNumpyStrings<ColumnString>(column, numpy_data_type->getSize(), out);
break;
default:
break;
}
}
}
void registerOutputFormatNpy(FormatFactory & factory)
{
factory.registerOutputFormat("Npy",[](
WriteBuffer & buf,
const Block & sample,
const FormatSettings &)
{
return std::make_shared<NpyOutputFormat>(buf, sample);
});
factory.markFormatHasNoAppendSupport("Npy");
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromVector.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSettings.h>
#include <Formats/NumpyDataTypes.h>
#include <Columns/IColumn.h>
#include <Common/PODArray_fwd.h>
#include <vector>
#include <string>
namespace DB
{
/** Stream for output data in Npy format.
* https://numpy.org/doc/stable/reference/generated/numpy.lib.format.html
*/
class NpyOutputFormat : public IOutputFormat
{
public:
NpyOutputFormat(WriteBuffer & out_, const Block & header_);
String getName() const override { return "NpyOutputFormat"; }
String getContentType() const override { return "application/octet-stream"; }
private:
String shapeStr() const;
bool getNumpyDataType(const DataTypePtr & type);
void consume(Chunk) override;
void initShape(const ColumnPtr & column);
void checkShape(ColumnPtr & column);
void updateSizeIfTypeString(const ColumnPtr & column);
void finalizeImpl() override;
void writeHeader();
void writeColumns();
bool is_initialized = false;
bool invalid_shape = false;
DataTypePtr data_type;
DataTypePtr nested_data_type;
std::shared_ptr<NumpyDataType> numpy_data_type;
UInt64 num_rows = 0;
std::vector<UInt64> numpy_shape;
Columns columns;
/// static header (version 3.0)
constexpr static auto STATIC_HEADER = "\x93NUMPY\x03\x00";
constexpr static size_t STATIC_HEADER_LENGTH = 8;
};
}

View File

@ -579,8 +579,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks, allow_implicit_projections);
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
const auto metadata = reading->getStorageMetadata();
ContextPtr context = reading->getContext();
@ -592,7 +590,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
}
else if (!candidates.real.empty())
{
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
auto ordinary_reading_select_result = reading->selectRangesToRead();
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.

View File

@ -136,12 +136,10 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
NormalProjectionCandidate * best_candidate = nullptr;
const Names & required_columns = reading->getAllColumnNames();
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
auto ordinary_reading_select_result = reading->selectRangesToRead();
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.

View File

@ -1364,11 +1364,27 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
return merging_pipes.empty() ? Pipe::unitePipes(std::move(no_merging_pipes)) : Pipe::unitePipes(std::move(merging_pipes));
}
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead() const
{
return selectRangesToReadImpl(
prepared_parts,
alter_conversions_for_parts,
metadata_for_reading,
query_info,
context,
requested_num_streams,
max_block_numbers_to_read,
data,
all_column_names,
log,
indexes);
}
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions) const
{
return selectRangesToRead(
return selectRangesToReadImpl(
std::move(parts),
std::move(alter_conversions),
metadata_for_reading,
@ -1855,10 +1871,7 @@ bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
{
auto result_ptr = analyzed_result_ptr
? analyzed_result_ptr
: selectRangesToRead(prepared_parts, alter_conversions_for_parts);
auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead();
return *result_ptr;
}

View File

@ -167,6 +167,8 @@ public:
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions) const;
AnalysisResultPtr selectRangesToRead() const;
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
/// Returns `false` if requested reading cannot be performed.

View File

@ -2,7 +2,6 @@
#include <cstddef>
#include <deque>
#include <set>
#include <fmt/core.h>
#include <fmt/format.h>

View File

@ -981,10 +981,7 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
std::lock_guard lock(mutex);
if (!pimpl)
{
mode = announcement.mode;
initialize();
}
initialize(announcement.mode);
pimpl->handleInitialAllRangesAnnouncement(std::move(announcement));
}
@ -996,10 +993,7 @@ ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelR
std::lock_guard lock(mutex);
if (!pimpl)
{
mode = request.mode;
initialize();
}
initialize(request.mode);
const auto replica_num = request.replica_num;
auto response = pimpl->handleRequest(std::move(request));
@ -1024,7 +1018,7 @@ void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica
pimpl->markReplicaAsUnavailable(replica_number);
}
void ParallelReplicasReadingCoordinator::initialize()
void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode)
{
switch (mode)
{

View File

@ -31,12 +31,11 @@ public:
void setProgressCallback(ProgressCallback callback);
private:
void initialize();
void initialize(CoordinationMode mode);
std::mutex mutex;
size_t replicas_count{0};
size_t mark_segment_size{0};
CoordinationMode mode{CoordinationMode::Default};
std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation
std::set<size_t> replicas_used;

View File

@ -751,6 +751,7 @@ class SettingsRandomizer:
"max_read_buffer_size": lambda: random.randint(500000, 1048576),
"prefer_localhost_replica": lambda: random.randint(0, 1),
"max_block_size": lambda: random.randint(8000, 100000),
"max_joined_block_size_rows": lambda: random.randint(8000, 100000),
"max_threads": lambda: random.randint(1, 64),
"optimize_append_index": lambda: random.randint(0, 1),
"optimize_if_chain_to_multiif": lambda: random.randint(0, 1),

View File

@ -254,6 +254,7 @@ def test_merge_tree_custom_disk_setting(start_cluster):
ORDER BY tuple()
SETTINGS
disk = disk(
name='test_name',
type=s3,
endpoint='http://minio1:9001/root/data2/',
access_key_id='minio',
@ -262,7 +263,7 @@ def test_merge_tree_custom_disk_setting(start_cluster):
)
expected = """
SETTINGS disk = disk(type = s3, endpoint = \\'[HIDDEN]\\', access_key_id = \\'[HIDDEN]\\', secret_access_key = \\'[HIDDEN]\\'), index_granularity = 8192
SETTINGS disk = disk(name = \\'test_name\\', type = s3, endpoint = \\'[HIDDEN]\\', access_key_id = \\'[HIDDEN]\\', secret_access_key = \\'[HIDDEN]\\'), index_granularity = 8192
"""
assert expected.strip() in node1.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()

View File

@ -38,7 +38,7 @@ def run_test(data_format, gen_data_template, settings):
formats = (
client.query(
"SELECT name FROM system.formats WHERE is_input AND is_output \
AND name NOT IN ('CapnProto', 'RawBLOB', 'Template', 'ProtobufSingle', 'LineAsString', 'Protobuf', 'ProtobufList') ORDER BY name"
AND name NOT IN ('CapnProto', 'RawBLOB', 'Template', 'ProtobufSingle', 'LineAsString', 'Protobuf', 'ProtobufList', 'Npy') ORDER BY name"
)
.strip()
.split("\n")

View File

@ -108,3 +108,15 @@ ARRAY JOIN value_array_array_inner_element AS value_array_array_inner_inner_elem
0 Value [[1,2,3],[4,5,6]] [4,5,6] [4,5,6] 4
0 Value [[1,2,3],[4,5,6]] [4,5,6] [4,5,6] 5
0 Value [[1,2,3],[4,5,6]] [4,5,6] [4,5,6] 6
SELECT '--';
--
SELECT 1 FROM system.one ARRAY JOIN arrayMap(x -> ignore(*), []);
SELECT arrayFilter(x -> notEmpty(concat(x, 'hello')), [''])
FROM system.one
ARRAY JOIN
[0] AS elem,
arrayMap(x -> concat(x, ignore(ignore(toLowCardinality('03147_parquet_memory_tracking.parquet'), 37, 37, toUInt128(37), 37, 37, toLowCardinality(37), 37), 8, ignore(ignore(1., 36, 8, 8)), *), 'hello'), ['']) AS unused
WHERE NOT ignore(elem)
GROUP BY
sum(ignore(ignore(ignore(1., 1, 36, 8, 8), ignore(52, 37, 37, '03147_parquet_memory_tracking.parquet', 37, 37, toUInt256(37), 37, 37, toNullable(37), 37, 37), 1., 1, 36, 8, 8), emptyArrayToSingle(arrayMap(x -> toString(x), arrayMap(x -> nullIf(x, 2), arrayJoin([[1]])))))) IGNORE NULLS,
modulo(toLowCardinality('03147_parquet_memory_tracking.parquet'), number, toLowCardinality(3)); -- { serverError UNKNOWN_IDENTIFIER }

View File

@ -65,6 +65,18 @@ SELECT id, value, value_array_array, value_array_array_inner_element, value_arra
FROM test_table ARRAY JOIN value_array_array AS value_array_array_inner_element
ARRAY JOIN value_array_array_inner_element AS value_array_array_inner_inner_element;
SELECT '--';
SELECT 1 FROM system.one ARRAY JOIN arrayMap(x -> ignore(*), []);
SELECT arrayFilter(x -> notEmpty(concat(x, 'hello')), [''])
FROM system.one
ARRAY JOIN
[0] AS elem,
arrayMap(x -> concat(x, ignore(ignore(toLowCardinality('03147_parquet_memory_tracking.parquet'), 37, 37, toUInt128(37), 37, 37, toLowCardinality(37), 37), 8, ignore(ignore(1., 36, 8, 8)), *), 'hello'), ['']) AS unused
WHERE NOT ignore(elem)
GROUP BY
sum(ignore(ignore(ignore(1., 1, 36, 8, 8), ignore(52, 37, 37, '03147_parquet_memory_tracking.parquet', 37, 37, toUInt256(37), 37, 37, toNullable(37), 37, 37), 1., 1, 36, 8, 8), emptyArrayToSingle(arrayMap(x -> toString(x), arrayMap(x -> nullIf(x, 2), arrayJoin([[1]])))))) IGNORE NULLS,
modulo(toLowCardinality('03147_parquet_memory_tracking.parquet'), number, toLowCardinality(3)); -- { serverError UNKNOWN_IDENTIFIER }
-- { echoOff }
DROP TABLE test_table;

View File

@ -0,0 +1,48 @@
-- test data types --
-1
1
-1
1
-1
1
-1
1
0
1
0
1
0
1
0
1
0.2
0.1
0.02
0.01
npy
npy
npynpy
npy
array Int8
array Int16
array Int32
array Int64
array UInt8
array UInt16
array UInt32
array UInt64
array Float32
array Float64
array String
array String
-- test nested data types --
[[[1],[2]],[[3],[4]]]
[[[1],[2]],[[3],[4]]]
[[0.1],[0.2]]
[[0.1],[0.2]]
['a','bb']
['ccc','dddd']
array Array(Array(Array(Int8)))
array Array(Array(Float64))
array Array(String)
-- test exceptions --

View File

@ -0,0 +1,118 @@
#!/usr/bin/env bash
# Tags: no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
user_files_path=$($CLICKHOUSE_CLIENT_BINARY -q "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/
rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/*
chmod 777 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/
${CLICKHOUSE_CLIENT} -n -q --ignore-error "
DROP DATABASE IF EXISTS npy_output_02895;
CREATE DATABASE IF NOT EXISTS npy_output_02895;
SELECT '-- test data types --';
CREATE TABLE IF NOT EXISTS npy_output_02895.data_types
(
i1 Int8,
i2 Int16,
i4 Int32,
i8 Int64,
u1 UInt8,
u2 UInt16,
u4 UInt32,
u8 UInt64,
f4 Float32,
f8 Float64,
fs FixedString(10),
s String
) Engine = MergeTree ORDER BY i1;
INSERT INTO npy_output_02895.data_types VALUES (1, 1, 1, 1, 1, 1, 1, 1, 0.1, 0.01, 'npy', 'npy'), (-1, -1, -1, -1, 0, 0, 0, 0, 0.2, 0.02, 'npy', 'npynpy');
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int8.npy') SELECT i1 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int16.npy') SELECT i2 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int32.npy') SELECT i4 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int64.npy') SELECT i8 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint8.npy') SELECT u1 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint16.npy') SELECT u2 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint32.npy') SELECT u4 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint64.npy') SELECT u8 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_float32.npy') SELECT f4 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_float64.npy') SELECT f8 FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_fixedstring.npy') SELECT fs FROM npy_output_02895.data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_string.npy') SELECT s FROM npy_output_02895.data_types;
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int8.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int16.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int32.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int64.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint8.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint16.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint32.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint64.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_float32.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_float64.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_fixedstring.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_string.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int8.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int16.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int32.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_int64.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint8.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint16.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint32.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_uint64.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_float32.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_float64.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_fixedstring.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_dtype_string.npy');
SELECT '-- test nested data types --';
CREATE TABLE IF NOT EXISTS npy_output_02895.nested_data_types
(
i4 Array(Array(Array(Int8))),
f8 Array(Array(Float64)),
s Array(String),
) Engine = MergeTree ORDER BY i4;
INSERT INTO npy_output_02895.nested_data_types VALUES ([[[1], [2]], [[3], [4]]], [[0.1], [0.2]], ['a', 'bb']), ([[[1], [2]], [[3], [4]]], [[0.1], [0.2]], ['ccc', 'dddd']);
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_int32.npy') SELECT i4 FROM npy_output_02895.nested_data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_float64.npy') SELECT f8 FROM npy_output_02895.nested_data_types;
INSERT INTO TABLE FUNCTION file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_string.npy') SELECT s FROM npy_output_02895.nested_data_types;
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_int32.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_float64.npy');
SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_string.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_int32.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_float64.npy');
DESC file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/02895_nested_dtype_string.npy');
SELECT '-- test exceptions --';
CREATE TABLE IF NOT EXISTS npy_output_02895.exceptions
(
unsupported_u UInt256,
unsupported_date Date,
unsupported_tuple Tuple(Int16, Int16),
unsupported_nested_i Array(Int128),
ragged_dimention Array(Int16),
zero_dimension Array(Int16)
) Engine = MergeTree ORDER BY unsupported_u;
INSERT INTO npy_output_02895.exceptions VALUES (1, '2019-01-01', (1, 1), [1, 1], [1, 1], []), (0, '2019-01-01', (0, 0), [0, 0], [0], [0]);
SELECT * FROM npy_output_02895.exceptions FORMAT Npy; -- { clientError TOO_MANY_COLUMNS }
SELECT unsupported_u FROM npy_output_02895.exceptions FORMAT Npy; -- { clientError BAD_ARGUMENTS }
SELECT unsupported_date FROM npy_output_02895.exceptions FORMAT Npy; -- { clientError BAD_ARGUMENTS }
SELECT unsupported_tuple FROM npy_output_02895.exceptions FORMAT Npy; -- { clientError BAD_ARGUMENTS }
SELECT unsupported_nested_i FROM npy_output_02895.exceptions FORMAT Npy; -- { clientError BAD_ARGUMENTS }
SELECT ragged_dimention FROM npy_output_02895.exceptions FORMAT Npy; -- { clientError ILLEGAL_COLUMN }
SELECT zero_dimension FROM npy_output_02895.exceptions FORMAT Npy; -- { clientError ILLEGAL_COLUMN }
DROP DATABASE IF EXISTS npy_output_02895;"
rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}

View File

@ -7,7 +7,7 @@ CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1"
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1 --index_granularity_bytes=10485760 --index_granularity=8192"
function test1_insert()
{

View File

@ -7,7 +7,7 @@ CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1"
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1 --index_granularity_bytes=10485760 --index_granularity=8192"
function test4_insert()
{

View File

@ -7,7 +7,7 @@ CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1"
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1 --index_granularity_bytes=10485760 --index_granularity=8192 "
function test5_insert()
{

View File

@ -7,7 +7,8 @@ CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1"
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --allow_suspicious_variant_types=1 --index_granularity_bytes=10485760 --index_granularity=8192 "
function test6_insert()
{

View File

@ -0,0 +1,14 @@
-- https://github.com/ClickHouse/ClickHouse/issues/48049
SET allow_experimental_analyzer = 1;
CREATE TABLE test_table (`id` UInt64, `value` String) ENGINE = TinyLog() AS Select number, number::String from numbers(10);
WITH CAST(tuple(1), 'Tuple (value UInt64)') AS compound_value
SELECT id, test_table.* APPLY x -> compound_value.*
FROM test_table
WHERE arrayMap(x -> toString(x) AS lambda, [NULL, 256, 257, NULL, NULL])
SETTINGS convert_query_to_cnf = true, optimize_using_constraints = true, optimize_substitute_columns = true; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
DESCRIBE TABLE (SELECT test_table.COLUMNS(id) FROM test_table WHERE '2147483647'); -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
DROP TABLE test_table;

View File

@ -39,6 +39,7 @@ $CLICKHOUSE_CLIENT -nm -q "
set send_logs_level='fatal';
drop table tp_1;
restore table tp_1 from Disk('backups', '$backup_id');
system stop merges tp_1;
" | grep -o "RESTORED"
$CLICKHOUSE_CLIENT -q "select count() from tp_1;"

View File

@ -0,0 +1,8 @@
CREATE TABLE users (uid Int16, name String, age Int16) ENGINE=MergeTree ORDER BY tuple();
INSERT INTO users VALUES (1231, 'John', 33);
INSERT INTO users VALUES (6666, 'Ksenia', 48);
INSERT INTO users VALUES (8888, 'Alice', 50);
SELECT count(*) OVER w
FROM users WINDOW w AS (ORDER BY uid), w AS(ORDER BY name); -- { serverError BAD_ARGUMENTS }

View File

@ -0,0 +1,6 @@
1 1 2
10 55 11
100 5050 101
1000 500500 1001
10000 50005000 10001
100000 5000050000 100001

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (x Int32) ENGINE = Memory;
-- insert several blocks with 1 or 2 rows:
INSERT INTO t1 VALUES (1);
INSERT INTO t1 VALUES (10),(100);
INSERT INTO t1 VALUES (1000);
INSERT INTO t1 VALUES (10000),(100000);
SET max_rows_in_join = 111;
SELECT x, sum(number), count(), FROM (
SELECT t1.x, t2.number
FROM t1
CROSS JOIN numbers_mt(10_000_000) t2
WHERE number <= x
)
GROUP BY ALL
ORDER BY x
;

View File

@ -10,21 +10,22 @@ HEADER = """<!--
the file is autogenerated by utils/security-generator/generate_security.py
-->
# Security Policy
# ClickHouse Security Vulnerability Response Policy
## Security Announcements
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/).
## Security Change Log and Support
## Scope and Supported Versions
Details regarding security fixes are publicly reported in our [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/). A summary of known security vulnerabilities is shown at the bottom of this page.
The following versions of ClickHouse server are currently being supported with security updates:
Vulnerability notifications pre-release or during embargo periods are available to open source users and support customers registered for vulnerability alerts. Refer to our [Embargo Policy](#embargo-policy) below.
The following versions of ClickHouse server are currently supported with security updates:
"""
FOOTER = """## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com). We do not offer any financial rewards for reporting issues to us using this method. Alternatively, you can also submit your findings through our public bug bounty program hosted by [Bugcrowd](https://bugcrowd.com/clickhouse) and be rewarded for it as per the program scope and rules of engagement.
To report a potential vulnerability in ClickHouse please send the details about it through our public bug bounty program hosted by [Bugcrowd](https://bugcrowd.com/clickhouse) and be rewarded for it as per the program scope and rules of engagement.
### When Should I Report a Vulnerability?
@ -45,6 +46,24 @@ As the security issue moves from triage, to identified fix, to release planning
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.
## Embargo Policy
Open source users and support customers may subscribe to receive alerts during the embargo period by visiting [https://trust.clickhouse.com/?product=clickhouseoss](https://trust.clickhouse.com/?product=clickhouseoss), requesting access and subscribing for alerts. Subscribers agree not to make these notifications public, issue communications, share this information with others, or issue public patches before the disclosure date. Accidental disclosures must be reported immediately to trust@clickhouse.com. Failure to follow this policy or repeated leaks may result in removal from the subscriber list.
Participation criteria:
1. Be a current open source user or support customer with a valid corporate email domain (no @gmail.com, @azure.com, etc.).
1. Sign up to the ClickHouse OSS Trust Center at [https://trust.clickhouse.com](https://trust.clickhouse.com).
1. Accept the ClickHouse Security Vulnerability Response Policy as outlined above.
1. Subscribe to ClickHouse OSS Trust Center alerts.
Removal criteria:
1. Members may be removed for failure to follow this policy or repeated leaks.
1. Members may be removed for bounced messages (mail delivery failure).
1. Members may unsubscribe at any time.
Notification process:
ClickHouse will post notifications within our OSS Trust Center and notify subscribers. Subscribers must log in to the Trust Center to download the notification. The notification will include the timeframe for public disclosure.
"""