Merge remote-tracking branch 'origin' into cache-for-object-storage-table-engines

This commit is contained in:
kssenii 2024-10-03 12:51:54 +02:00
commit c518232391
239 changed files with 2209 additions and 1964 deletions

View File

@ -1 +1,4 @@
# See contrib/usearch-cmake/CMakeLists.txt
set (FP16_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/FP16/")
add_library(_fp16 INTERFACE)
target_include_directories(_fp16 SYSTEM INTERFACE ${FP16_PROJECT_DIR}/include)

2
contrib/SimSIMD vendored

@ -1 +1 @@
Subproject commit 91a76d1ac519b3b9dc8957734a3dabd985f00c26
Subproject commit ff51434d90c66f916e94ff05b24530b127aa4cff

View File

@ -1 +1,4 @@
# See contrib/usearch-cmake/CMakeLists.txt
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
add_library(_simsimd INTERFACE)
target_include_directories(_simsimd SYSTEM INTERFACE "${SIMSIMD_PROJECT_DIR}/include")

2
contrib/usearch vendored

@ -1 +1 @@
Subproject commit 7a8967cb442b08ca20c3dd781414378e65957d37
Subproject commit d1d33eac94acd3b628e0b446c927ec3295ef63c7

View File

@ -1,14 +1,9 @@
set(FP16_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/FP16")
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
set(USEARCH_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/usearch")
add_library(_usearch INTERFACE)
target_include_directories(_usearch SYSTEM INTERFACE ${USEARCH_PROJECT_DIR}/include)
target_include_directories(_usearch SYSTEM INTERFACE
${FP16_PROJECT_DIR}/include
${SIMSIMD_PROJECT_DIR}/include
${USEARCH_PROJECT_DIR}/include)
target_link_libraries(_usearch INTERFACE _fp16)
target_compile_definitions(_usearch INTERFACE USEARCH_USE_FP16LIB)
# target_compile_definitions(_usearch INTERFACE USEARCH_USE_SIMSIMD)

View File

@ -7,14 +7,24 @@ import subprocess
import sys
def build_docker_deps(image_name, imagedir):
cmd = f"""docker run --entrypoint "/bin/bash" {image_name} -c "pip install pipdeptree 2>/dev/null 1>/dev/null && pipdeptree --freeze --warn silence | sed 's/ \+//g' | sort | uniq" > {imagedir}/requirements.txt"""
def build_docker_deps(image_name: str, imagedir: str) -> None:
print("Fetch the newest manifest for", image_name)
pip_cmd = (
"pip install pipdeptree 2>/dev/null 1>/dev/null && pipdeptree --freeze "
"--warn silence --exclude pipdeptree"
)
# /=/!d - remove dependencies without pin
# ubuntu - ignore system packages
# \s - remove spaces
sed = r"sed '/==/!d; /==.*+ubuntu/d; s/\s//g'"
cmd = rf"""docker run --rm --entrypoint "/bin/bash" {image_name} -c "{pip_cmd} | {sed} | sort -u" > {imagedir}/requirements.txt"""
print("Running the command:", cmd)
subprocess.check_call(cmd, shell=True)
def check_docker_file_install_with_pip(filepath):
image_name = None
with open(filepath, "r") as f:
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
if "docker build" in line:
arr = line.split(" ")
@ -25,7 +35,7 @@ def check_docker_file_install_with_pip(filepath):
return image_name, False
def process_affected_images(images_dir):
def process_affected_images(images_dir: str) -> None:
for root, _dirs, files in os.walk(images_dir):
for f in files:
if f == "Dockerfile":

View File

@ -48,7 +48,7 @@ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - \
&& add-apt-repository "deb https://download.docker.com/linux/ubuntu $(lsb_release -c -s) ${DOCKER_CHANNEL}" \
&& apt-get update \
&& env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
docker-ce='5:23.*' \
docker-ce='5:23.*' docker-compose-plugin='2.29.*' \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \

View File

@ -1,15 +1,13 @@
PyHDFS==0.3.1
PyJWT==2.3.0
PyMySQL==1.1.0
PyJWT==2.4.0
PyMySQL==1.1.1
PyNaCl==1.5.0
PyYAML==5.3.1
SecretStorage==3.3.1
argon2-cffi-bindings==21.2.0
argon2-cffi==23.1.0
async-timeout==4.0.3
asyncio==3.4.3
attrs==23.2.0
avro==1.10.2
avro==1.11.3
azure-core==1.30.1
azure-storage-blob==12.19.0
bcrypt==4.1.3
@ -24,18 +22,13 @@ cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
confluent-kafka==2.3.0
cryptography==3.4.8
cryptography==42.0.0
dbus-python==1.2.18
decorator==5.1.1
delta-spark==2.3.0
deltalake==0.16.0
dict2xml==1.7.4
dicttoxml==1.7.16
distro-info==1.1+ubuntu0.2
distro==1.7.0
docker-compose==1.29.2
docker==6.1.3
dockerpty==0.4.1
docopt==0.6.2
exceptiongroup==1.2.1
execnet==2.1.1
geomet==0.2.1.post1
@ -49,7 +42,6 @@ iniconfig==2.0.0
isodate==0.6.1
jeepney==0.7.1
jmespath==1.0.1
jsonschema==3.2.0
jwcrypto==1.5.6
kafka-python==2.0.2
kazoo==2.9.0
@ -63,23 +55,22 @@ lz4==4.3.3
minio==7.2.3
more-itertools==8.10.0
nats-py==2.6.0
numpy==2.1.0
oauthlib==3.2.0
packaging==24.0
paramiko==3.4.0
pika==1.2.0
pip==24.1.1
pipdeptree==2.23.0
pluggy==1.5.0
protobuf==4.25.2
psycopg2-binary==2.9.6
py4j==0.10.9.5
py==1.11.0
pyarrow-hotfix==0.6
pyarrow==17.0.0
pycparser==2.22
pycryptodome==3.20.0
pymongo==3.11.0
pyparsing==2.4.7
pyrsistent==0.20.0
pyspark==3.3.2
pyspnego==0.10.2
pytest-order==1.0.0
@ -89,28 +80,22 @@ pytest-reportlog==0.4.0
pytest-timeout==2.2.0
pytest-xdist==3.5.0
pytest==7.4.4
python-apt==2.4.0+ubuntu3
python-dateutil==2.9.0.post0
python-dotenv==0.21.1
pytz==2023.3.post1
redis==5.0.1
requests-kerberos==0.14.0
requests==2.31.0
retry==0.9.2
s3transfer==0.10.1
setuptools==59.6.0
setuptools==70.0.0
simplejson==3.19.2
six==1.16.0
soupsieve==2.5
texttable==1.7.0
tomli==2.0.1
typing_extensions==4.11.0
tzlocal==2.1
unattended-upgrades==0.1
urllib3==2.0.7
wadllib==1.3.6
websocket-client==0.59.0
wheel==0.37.1
websocket-client==1.8.0
wheel==0.38.1
zipp==1.0.0
deltalake==0.16.0

View File

@ -1064,4 +1064,32 @@ Possible values:
- throw, drop, rebuild
Default value: throw
Default value: throw
## min_free_disk_bytes_to_perform_insert
The minimum number of bytes that should be free in disk space in order to insert data. If the number of available free bytes is less than `min_free_disk_bytes_to_throw_insert` then an exception is thrown and the insert is not executed. Note that this setting:
- takes into account the `keep_free_space_bytes` setting.
- does not take into account the amount of data that will be written by the `INSERT` operation.
- is only checked if a positive (non-zero) number of bytes is specified
Possible values:
- Any positive integer.
Default value: 0 bytes.
## min_free_disk_ratio_to_perform_insert
The minimum free to total disk space ratio to perform an `INSERT`. Must be a floating point value between 0 and 1. Note that this setting:
- takes into account the `keep_free_space_bytes` setting.
- does not take into account the amount of data that will be written by the `INSERT` operation.
- is only checked if a positive (non-zero) ratio is specified
Possible values:
- Float, 0.0 - 1.0
Default value: 0.0
Note that if both `min_free_disk_ratio_to_perform_insert` and `min_free_disk_bytes_to_perform_insert` are specified, ClickHouse will count on the value that will allow to perform inserts on a bigger amount of free memory.

View File

@ -2045,7 +2045,7 @@ Possible values:
- 0 - Disabled.
- 1 - Enabled.
Default value: `0`.
Default value: `1`.
### async_insert_busy_timeout_min_ms {#async-insert-busy-timeout-min-ms}
@ -5683,6 +5683,12 @@ Enable `IF NOT EXISTS` for `CREATE` statement by default. If either this setting
Default value: `false`.
## enable_secure_identifiers
If enabled, only allow secure identifiers which contain only underscore and alphanumeric characters
Default value: `false`.
## show_create_query_identifier_quoting_rule
Define identifier quoting behavior of the show create query result:

View File

@ -19,9 +19,7 @@ avgWeighted(x, weight)
- `weight` — Weights of the values.
`x` and `weight` must both be
[Integer](../../../sql-reference/data-types/int-uint.md),
[floating-point](../../../sql-reference/data-types/float.md), or
[Decimal](../../../sql-reference/data-types/decimal.md),
[Integer](../../../sql-reference/data-types/int-uint.md) or [floating-point](../../../sql-reference/data-types/float.md),
but may have different types.
**Returned value**

View File

@ -1623,6 +1623,7 @@ If unit `WEEK` was specified, `toStartOfInterval` assumes that weeks start on Mo
toStartOfInterval(value, INTERVAL x unit[, time_zone])
toStartOfInterval(value, INTERVAL x unit[, origin[, time_zone]])
```
Aliases: `time_bucket`, `date_bin`.
The second overload emulates TimescaleDB's `time_bucket()` function, respectively PostgreSQL's `date_bin()` function, e.g.
@ -1630,7 +1631,7 @@ The second overload emulates TimescaleDB's `time_bucket()` function, respectivel
SELECT toStartOfInterval(toDateTime('2023-01-01 14:45:00'), INTERVAL 1 MINUTE, toDateTime('2023-01-01 14:35:30'));
```
result:
Result:
``` reference
┌───toStartOfInterval(...)─┐
@ -1638,8 +1639,6 @@ result:
└──────────────────────────┘
```
Aliases: `time_bucket`, `date_bin`.
**See Also**
- [date_trunc](#date_trunc)
@ -1975,6 +1974,38 @@ Result:
Converts a date, or date with time, to a UInt16 number containing the ISO Year number.
**Syntax**
```sql
toISOYear(value)
```
**Arguments**
- `value` — The value with date or date with time.
**Returned value**
- `value` converted to the current ISO year number. [UInt16](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT
toISOYear(toDate('2024/10/02')) as year1,
toISOYear(toDateTime('2024/10/02 01:30:00')) as year2
```
Result:
```response
┌─year1─┬─year2─┐
│ 2024 │ 2024 │
└───────┴───────┘
```
## toISOWeek
Converts a date, or date with time, to a UInt8 number containing the ISO Week number.

View File

@ -280,6 +280,38 @@ SELECT
Same as `toIPv4`, but if the IPv4 address has an invalid format, it returns `0.0.0.0` (0 IPv4).
**Syntax**
```sql
toIPv4OrDefault(value)
```
**Arguments**
- `value` — The value with IPv4 address.
**Returned value**
- `value` converted to the current IPv4 address. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT
toIPv4OrDefault('192.168.0.1') AS s1,
toIPv4OrDefault('192.168.0') AS s2
```
Result:
```response
┌─s1──────────┬─s2──────┐
│ 192.168.0.1 │ 0.0.0.0 │
└─────────────┴─────────┘
```
## toIPv4OrNull(string)
Same as `toIPv4`, but if the IPv4 address has an invalid format, it returns null.

View File

@ -207,7 +207,31 @@ If `NULL` is passed, then the function returns type `Nullable(Nothing)`, which c
**Syntax**
```sql
toTypeName(x)
toTypeName(value)
```
**Arguments**
- `value` — The value with any arbitrary.
**Returned value**
- `value` converted to the current data type name. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT toTypeName(123);
```
Result:
```response
┌─toTypeName(123)─┐
│ UInt8 │
└─────────────────┘
```
## blockSize {#blockSize}
@ -500,6 +524,30 @@ Useful in table engine parameters of `CREATE TABLE` queries where you need to sp
currentDatabase()
```
**Arguments**
None.
**Returned value**
- `value` returns the current database name. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT currentDatabase()
```
Result:
```response
┌─currentDatabase()─┐
│ default │
└───────────────────┘
```
## currentUser {#currentUser}
Returns the name of the current user. In case of a distributed query, the name of the user who initiated the query is returned.

View File

@ -272,8 +272,7 @@ ALTER TABLE table_name MODIFY COLUMN column_name RESET SETTING max_compress_bloc
## MATERIALIZE COLUMN
Materializes a column with a `DEFAULT` or `MATERIALIZED` value expression.
This statement can be used to rewrite existing column data after a `DEFAULT` or `MATERIALIZED` expression has been added or updated (which only updates the metadata but does not change existing data).
Materializes a column with a `DEFAULT` or `MATERIALIZED` value expression. When adding a materialized column using `ALTER TABLE table_name ADD COLUMN column_name MATERIALIZED`, existing rows without materialized values are not automatically filled. `MATERIALIZE COLUMN` statement can be used to rewrite existing column data after a `DEFAULT` or `MATERIALIZED` expression has been added or updated (which only updates the metadata but does not change existing data).
Implemented as a [mutation](/docs/en/sql-reference/statements/alter/index.md#mutations).
For columns with a new or updated `MATERIALIZED` value expression, all existing rows are rewritten.

View File

@ -7,7 +7,7 @@ sidebar_label: EXISTS
# EXISTS Statement
``` sql
EXISTS [TEMPORARY] [TABLE|DICTIONARY] [db.]name [INTO OUTFILE filename] [FORMAT format]
EXISTS [TEMPORARY] [TABLE|DICTIONARY|DATABASE] [db.]name [INTO OUTFILE filename] [FORMAT format]
```
Returns a single `UInt8`-type column, which contains the single value `0` if the table or database does not exist, or `1` if the table exists in the specified database.

View File

@ -495,6 +495,7 @@ WITH toDateTime64('2020-01-01 10:20:30.999', 3) AS dt64 SELECT toStartOfSecond(d
toStartOfInterval(value, INTERVAL x unit[, time_zone])
toStartOfInterval(value, INTERVAL x unit[, origin[, time_zone]])
```
Синонимы: `time_bucket`, `date_bin`.
Вторая перегрузка эмулирует функцию `time_bucket()` из TimescaleDB, и функцию `date_bin()` из PostgreSQL, например:
@ -510,8 +511,6 @@ SELECT toStartOfInterval(toDateTime('2023-01-01 14:45:00'), INTERVAL 1 MINUTE, t
└──────────────────────────┘
```
Синонимы: `time_bucket`, `date_bin`.
**См. также**
- [date_trunc](#date_trunc)

View File

@ -21,8 +21,7 @@ avgWeighted(x, weight)
`x``weight` 的类型必须是
[整数](../../../sql-reference/data-types/int-uint.md), 或
[浮点数](../../../sql-reference/data-types/float.md), 或
[定点数](../../../sql-reference/data-types/decimal.md),
[浮点数](../../../sql-reference/data-types/float.md),
但是可以不一样。
**返回值**

View File

@ -36,7 +36,7 @@
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/formatReadable.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/getExecutablePath.h>
#include <Common/ProfileEvents.h>
#include <Common/Scheduler/IResourceManager.h>
@ -833,11 +833,13 @@ try
const size_t physical_server_memory = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; logical cores: {}; used cores: {}.",
LOG_INFO(
log,
"Available RAM: {}; logical cores: {}; used cores: {}.",
formatReadableSizeWithBinarySuffix(physical_server_memory),
std::thread::hardware_concurrency(),
getNumberOfPhysicalCPUCores() // on ARM processors it can show only enabled at current moment cores
);
getNumberOfCPUCoresToUse() // on ARM processors it can show only enabled at current moment cores
);
#if defined(__x86_64__)
String cpu_info;
@ -1060,8 +1062,9 @@ try
0, // We don't need any threads one all the parts will be deleted
server_settings.max_parts_cleaning_thread_pool_size);
auto max_database_replicated_create_table_thread_pool_size = server_settings.max_database_replicated_create_table_thread_pool_size ?
server_settings.max_database_replicated_create_table_thread_pool_size : getNumberOfPhysicalCPUCores();
auto max_database_replicated_create_table_thread_pool_size = server_settings.max_database_replicated_create_table_thread_pool_size
? server_settings.max_database_replicated_create_table_thread_pool_size
: getNumberOfCPUCoresToUse();
getDatabaseReplicatedCreateTablesThreadPool().initialize(
max_database_replicated_create_table_thread_pool_size,
0, // We don't need any threads once all the tables will be created
@ -1640,7 +1643,7 @@ try
concurrent_threads_soft_limit = new_server_settings.concurrent_threads_soft_limit_num;
if (new_server_settings.concurrent_threads_soft_limit_ratio_to_cores > 0)
{
auto value = new_server_settings.concurrent_threads_soft_limit_ratio_to_cores * getNumberOfPhysicalCPUCores();
auto value = new_server_settings.concurrent_threads_soft_limit_ratio_to_cores * getNumberOfCPUCoresToUse();
if (value > 0 && value < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = value;
}

View File

@ -13,19 +13,19 @@ max-statements=200
[tool.pylint.'MESSAGES CONTROL']
# pytest.mark.parametrize is not callable (not-callable)
disable = '''
missing-docstring,
too-few-public-methods,
invalid-name,
too-many-arguments,
too-many-locals,
too-many-instance-attributes,
bare-except,
broad-except,
cell-var-from-loop,
fixme,
invalid-name,
missing-docstring,
redefined-outer-name,
too-few-public-methods,
too-many-arguments,
too-many-instance-attributes,
too-many-locals,
too-many-public-methods,
wildcard-import,
redefined-outer-name,
broad-except,
bare-except,
'''
[tool.isort]

View File

@ -623,7 +623,7 @@ AuthResult AccessControl::authenticate(const Credentials & credentials, const Po
/// We use the same message for all authentication failures because we don't want to give away any unnecessary information for security reasons,
/// only the log will show the exact reason.
throw Exception(PreformattedMessage{message.str(),
"{}: Authentication failed: password is incorrect, or there is no user with such name.{}",
"{}: Authentication failed: password is incorrect, or there is no user with such name",
std::vector<std::string>{credentials.getUserName()}},
ErrorCodes::AUTHENTICATION_FAILED);
}

View File

@ -447,51 +447,6 @@ public:
}
}
void readAndMerge(DB::ReadBuffer & rb)
{
UInt8 rhs_skip_degree = 0;
DB::readBinaryLittleEndian(rhs_skip_degree, rb);
if (rhs_skip_degree > skip_degree)
{
skip_degree = rhs_skip_degree;
rehash();
}
size_t rhs_size = 0;
DB::readVarUInt(rhs_size, rb);
if (rhs_size > UNIQUES_HASH_MAX_SIZE)
throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
if ((1ULL << size_degree) < rhs_size)
{
UInt8 new_size_degree = std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(rhs_size - 1)) + 2);
resize(new_size_degree);
}
if (rhs_size <= 1)
{
for (size_t i = 0; i < rhs_size; ++i)
{
HashValue x = 0;
DB::readBinaryLittleEndian(x, rb);
insertHash(x);
}
}
else
{
auto hs = std::make_unique<HashValue[]>(rhs_size);
rb.readStrict(reinterpret_cast<char *>(hs.get()), rhs_size * sizeof(HashValue));
for (size_t i = 0; i < rhs_size; ++i)
{
DB::transformEndianness<std::endian::native, std::endian::little>(hs[i]);
insertHash(hs[i]);
}
}
}
static void skip(DB::ReadBuffer & rb)
{
size_t size = 0;

View File

@ -23,11 +23,13 @@ JoinNode::JoinNode(QueryTreeNodePtr left_table_expression_,
QueryTreeNodePtr join_expression_,
JoinLocality locality_,
JoinStrictness strictness_,
JoinKind kind_)
JoinKind kind_,
bool is_using_join_expression_)
: IQueryTreeNode(children_size)
, locality(locality_)
, strictness(strictness_)
, kind(kind_)
, is_using_join_expression(is_using_join_expression_)
{
children[left_table_expression_child_index] = std::move(left_table_expression_);
children[right_table_expression_child_index] = std::move(right_table_expression_);
@ -45,7 +47,7 @@ ASTPtr JoinNode::toASTTableJoin() const
{
auto join_expression_ast = children[join_expression_child_index]->toAST();
if (children[join_expression_child_index]->getNodeType() == QueryTreeNodeType::LIST)
if (is_using_join_expression)
join_ast->using_expression_list = std::move(join_expression_ast);
else
join_ast->on_expression = std::move(join_expression_ast);
@ -82,7 +84,8 @@ void JoinNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, si
bool JoinNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions) const
{
const auto & rhs_typed = assert_cast<const JoinNode &>(rhs);
return locality == rhs_typed.locality && strictness == rhs_typed.strictness && kind == rhs_typed.kind;
return locality == rhs_typed.locality && strictness == rhs_typed.strictness && kind == rhs_typed.kind &&
is_using_join_expression == rhs_typed.is_using_join_expression;
}
void JoinNode::updateTreeHashImpl(HashState & state, CompareOptions) const
@ -90,11 +93,14 @@ void JoinNode::updateTreeHashImpl(HashState & state, CompareOptions) const
state.update(locality);
state.update(strictness);
state.update(kind);
state.update(is_using_join_expression);
}
QueryTreeNodePtr JoinNode::cloneImpl() const
{
return std::make_shared<JoinNode>(getLeftTableExpression(), getRightTableExpression(), getJoinExpression(), locality, strictness, kind);
return std::make_shared<JoinNode>(
getLeftTableExpression(), getRightTableExpression(), getJoinExpression(),
locality, strictness, kind, is_using_join_expression);
}
ASTPtr JoinNode::toASTImpl(const ConvertToASTOptions & options) const

View File

@ -44,7 +44,8 @@ public:
QueryTreeNodePtr join_expression_,
JoinLocality locality_,
JoinStrictness strictness_,
JoinKind kind_);
JoinKind kind_,
bool is_using_join_expression_);
/// Get left table expression
const QueryTreeNodePtr & getLeftTableExpression() const
@ -91,13 +92,13 @@ public:
/// Returns true if join has USING join expression, false otherwise
bool isUsingJoinExpression() const
{
return hasJoinExpression() && getJoinExpression()->getNodeType() == QueryTreeNodeType::LIST;
return hasJoinExpression() && is_using_join_expression;
}
/// Returns true if join has ON join expression, false otherwise
bool isOnJoinExpression() const
{
return hasJoinExpression() && getJoinExpression()->getNodeType() != QueryTreeNodeType::LIST;
return hasJoinExpression() && !is_using_join_expression;
}
/// Get join locality
@ -154,6 +155,7 @@ private:
JoinLocality locality = JoinLocality::Unspecified;
JoinStrictness strictness = JoinStrictness::Unspecified;
JoinKind kind = JoinKind::Inner;
bool is_using_join_expression;
static constexpr size_t left_table_expression_child_index = 0;
static constexpr size_t right_table_expression_child_index = 1;

View File

@ -957,7 +957,9 @@ QueryTreeNodePtr QueryTreeBuilder::buildJoinTree(const ASTPtr & tables_in_select
std::move(join_expression),
table_join.locality,
result_join_strictness,
result_join_kind);
result_join_kind,
table_join.using_expression_list != nullptr);
join_node->setOriginalAST(table_element.table_join);
/** Original AST is not set because it will contain only join part and does

View File

@ -12,7 +12,7 @@
#include <Common/MemoryTracker.h>
#include <Common/scope_guard_safe.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/typeid_cast.h>
#include <Common/TerminalSize.h>
#include <Common/StringUtils.h>
@ -1630,7 +1630,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
client_context,
{},
client_context->getSettingsRef()[Setting::max_block_size],
getNumberOfPhysicalCPUCores());
getNumberOfCPUCoresToUse());
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(client_context),

View File

@ -770,9 +770,11 @@ ColumnPtr ColumnTuple::compress() const
return ColumnCompressed::create(size(), byte_size,
[my_compressed = std::move(compressed)]() mutable
{
for (auto & column : my_compressed)
column = column->decompress();
return ColumnTuple::create(my_compressed);
Columns decompressed;
decompressed.reserve(my_compressed.size());
for (const auto & column : my_compressed)
decompressed.push_back(column->decompress());
return ColumnTuple::create(decompressed);
});
}

View File

@ -1393,9 +1393,11 @@ ColumnPtr ColumnVariant::compress() const
return ColumnCompressed::create(size(), byte_size,
[my_local_discriminators_compressed = std::move(local_discriminators_compressed), my_offsets_compressed = std::move(offsets_compressed), my_compressed = std::move(compressed), my_local_to_global_discriminators = this->local_to_global_discriminators]() mutable
{
for (auto & variant : my_compressed)
variant = variant->decompress();
return ColumnVariant::create(my_local_discriminators_compressed->decompress(), my_offsets_compressed->decompress(), my_compressed, my_local_to_global_discriminators);
Columns decompressed;
decompressed.reserve(my_compressed.size());
for (const auto & variant : my_compressed)
decompressed.push_back(variant->decompress());
return ColumnVariant::create(my_local_discriminators_compressed->decompress(), my_offsets_compressed->decompress(), decompressed, my_local_to_global_discriminators);
});
}

View File

@ -12,7 +12,7 @@
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
@ -49,14 +49,14 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init)
: name(init.name)
, priority(init.priority)
, max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores())
, max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfCPUCoresToUse())
, thread_pool(std::make_unique<ThreadPool>(
init.metric_threads,
init.metric_active_threads,
init.metric_scheduled_threads,
/* max_threads = */ std::numeric_limits<size_t>::max(), // Unlimited number of threads, we do worker management ourselves
/* max_free_threads = */ 0, // We do not require free threads
/* queue_size = */0)) // Unlimited queue to avoid blocking during worker spawning
init.metric_threads,
init.metric_active_threads,
init.metric_scheduled_threads,
/* max_threads = */ std::numeric_limits<size_t>::max(), // Unlimited number of threads, we do worker management ourselves
/* max_free_threads = */ 0, // We do not require free threads
/* queue_size = */ 0)) // Unlimited queue to avoid blocking during worker spawning
{}
AsyncLoader::Pool::Pool(Pool&& o) noexcept
@ -491,7 +491,7 @@ void AsyncLoader::remove(const LoadJobSet & jobs)
void AsyncLoader::setMaxThreads(size_t pool, size_t value)
{
if (value == 0)
value = getNumberOfPhysicalCPUCores();
value = getNumberOfCPUCoresToUse();
std::unique_lock lock{mutex};
auto & p = pools[pool];
// Note that underlying `ThreadPool` always has unlimited `queue_size` and `max_threads`.

View File

@ -177,48 +177,6 @@ public:
}
}
void readAndMerge(DB::ReadBuffer & in)
{
auto container_type = getContainerType();
/// If readAndMerge is called with an empty state, just deserialize
/// the state is specified as a parameter.
if ((container_type == details::ContainerType::SMALL) && small.empty())
{
read(in);
return;
}
UInt8 v;
readBinary(v, in);
auto rhs_container_type = static_cast<details::ContainerType>(v);
auto max_container_type = details::max(container_type, rhs_container_type);
if (container_type != max_container_type)
{
if (max_container_type == details::ContainerType::MEDIUM)
toMedium();
else if (max_container_type == details::ContainerType::LARGE)
toLarge();
}
if (rhs_container_type == details::ContainerType::SMALL)
{
typename Small::Reader reader(in);
while (reader.next())
insert(reader.get());
}
else if (rhs_container_type == details::ContainerType::MEDIUM)
{
typename Medium::Reader reader(in);
while (reader.next())
insert(reader.get());
}
else if (rhs_container_type == details::ContainerType::LARGE)
getContainer<Large>().readAndMerge(in);
}
void write(DB::WriteBuffer & out) const
{
auto container_type = getContainerType();

View File

@ -16,11 +16,4 @@ public:
if (Base::buf[i].isZero(*this) && !rhs.buf[i].isZero(*this))
new (&Base::buf[i]) Cell(rhs.buf[i]);
}
/// NOTE: Currently this method isn't used. When it does, the ReadBuffer should
/// contain the Key explicitly.
// void readAndMerge(DB::ReadBuffer & rb)
// {
// }
};

View File

@ -16,7 +16,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_LARGE_ARRAY_SIZE;
}
}
@ -55,26 +54,6 @@ public:
if (!rhs.buf[i].isZero(*this))
this->insert(rhs.buf[i].getValue());
}
void readAndMerge(DB::ReadBuffer & rb)
{
Cell::State::read(rb);
size_t new_size = 0;
DB::readVarUInt(new_size, rb);
if (new_size > 100'000'000'000)
throw DB::Exception(DB::ErrorCodes::TOO_LARGE_ARRAY_SIZE, "The size of serialized hash table is suspiciously large: {}", new_size);
this->resize(new_size);
for (size_t i = 0; i < new_size; ++i)
{
Cell x;
x.read(rb);
this->insert(x.getValue());
}
}
};

View File

@ -353,18 +353,6 @@ public:
}
}
void readAndMerge(DB::ReadBuffer & in)
{
typename RankStore::Reader reader(in);
while (reader.next())
{
const auto & data = reader.get();
update(data.first, data.second);
}
in.ignore(sizeof(DenominatorCalculatorType) + sizeof(ZerosCounterType));
}
static void skip(DB::ReadBuffer & in)
{
in.ignore(sizeof(RankStore) + sizeof(DenominatorCalculatorType) + sizeof(ZerosCounterType));

View File

@ -113,24 +113,6 @@ public:
small.read(in);
}
void readAndMerge(DB::ReadBuffer & in)
{
bool is_rhs_large;
readBinary(is_rhs_large, in);
if (!isLarge() && is_rhs_large)
toLarge();
if (!is_rhs_large)
{
typename Small::Reader reader(in);
while (reader.next())
insert(reader.get());
}
else
large->readAndMerge(in);
}
void write(DB::WriteBuffer & out) const
{
writeBinary(isLarge(), out);

View File

@ -2,7 +2,7 @@
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/noexcept_scope.h>
@ -93,7 +93,7 @@ static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_, Metric metric_scheduled_jobs_)
: ThreadPoolImpl(metric_threads_, metric_active_threads_, metric_scheduled_jobs_, getNumberOfPhysicalCPUCores())
: ThreadPoolImpl(metric_threads_, metric_active_threads_, metric_scheduled_jobs_, getNumberOfCPUCoresToUse())
{
}

View File

@ -1,4 +1,4 @@
#include "getNumberOfPhysicalCPUCores.h"
#include "getNumberOfCPUCoresToUse.h"
#if defined(OS_LINUX)
# include <cmath>
@ -165,7 +165,7 @@ catch (...)
}
#endif
unsigned getNumberOfPhysicalCPUCoresImpl()
unsigned getNumberOfCPUCoresToUseImpl()
{
unsigned cores = std::thread::hardware_concurrency(); /// logical cores (with SMT/HyperThreading)
@ -189,9 +189,9 @@ unsigned getNumberOfPhysicalCPUCoresImpl()
}
unsigned getNumberOfPhysicalCPUCores()
unsigned getNumberOfCPUCoresToUse()
{
/// Calculate once.
static auto cores = getNumberOfPhysicalCPUCoresImpl();
static const unsigned cores = getNumberOfCPUCoresToUseImpl();
return cores;
}

View File

@ -0,0 +1,6 @@
#pragma once
/// Get the number of CPU cores to use. Depending on the machine size we choose
/// between the number of physical and logical cores.
/// Also under cgroups we respect possible cgroups limits.
unsigned getNumberOfCPUCoresToUse();

View File

@ -1,5 +0,0 @@
#pragma once
/// Get number of CPU cores without hyper-threading.
/// The calculation respects possible cgroups limits.
unsigned getNumberOfPhysicalCPUCores();

View File

@ -54,6 +54,7 @@ struct Settings;
M(UInt64, log_file_overallocate_size, 50 * 1024 * 1024, "If max_log_file_size is not set to 0, this value will be added to it for preallocating bytes on disk. If a log record is larger than this value, it could lead to uncaught out-of-space issues so a larger value is preferred", 0) \
M(UInt64, min_request_size_for_cache, 50 * 1024, "Minimal size of the request to cache the deserialization result. Caching can have negative effect on latency for smaller requests, set to 0 to disable", 0) \
M(UInt64, raft_limits_reconnect_limit, 50, "If connection to a peer is silent longer than this limit * (multiplied by heartbeat interval), we re-establish the connection.", 0) \
M(UInt64, raft_limits_response_limit, 20, "Total wait time for a response is calculated by multiplying response_limit with heart_beat_interval_ms", 0) \
M(Bool, async_replication, false, "Enable async replication. All write and read guarantees are preserved while better performance is achieved. Settings is disabled by default to not break backwards compatibility.", 0) \
M(Bool, experimental_use_rocksdb, false, "Use rocksdb as backend storage", 0) \
M(UInt64, latest_logs_cache_size_threshold, 1 * 1024 * 1024 * 1024, "Maximum total size of in-memory cache of latest log entries.", 0) \

View File

@ -411,7 +411,9 @@ KeeperContext::Storage KeeperContext::getLogsPathFromConfig(const Poco::Util::Ab
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalLogDisk", path);
auto disk = std::make_shared<DiskLocal>("LocalLogDisk", path);
disk->startup(Context::getGlobalContextInstance(), false);
return disk;
};
/// the most specialized path
@ -437,7 +439,9 @@ KeeperContext::Storage KeeperContext::getSnapshotsPathFromConfig(const Poco::Uti
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalSnapshotDisk", path);
auto disk = std::make_shared<DiskLocal>("LocalSnapshotDisk", path);
disk->startup(Context::getGlobalContextInstance(), false);
return disk;
};
/// the most specialized path
@ -463,7 +467,9 @@ KeeperContext::Storage KeeperContext::getStatePathFromConfig(const Poco::Util::A
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalStateFileDisk", path);
auto disk = std::make_shared<DiskLocal>("LocalStateFileDisk", path);
disk->startup(Context::getGlobalContextInstance(), false);
return disk;
};
if (config.has("keeper_server.state_storage_disk"))

View File

@ -26,7 +26,7 @@
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/Stopwatch.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#if USE_SSL
# include <Server/CertificateReloader.h>
@ -444,7 +444,7 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
/// At least 16 threads for network communication in asio.
/// asio is async framework, so even with 1 thread it should be ok, but
/// still as safeguard it's better to have some redundant capacity here
asio_opts.thread_pool_size_ = std::max(16U, getNumberOfPhysicalCPUCores());
asio_opts.thread_pool_size_ = std::max(16U, getNumberOfCPUCoresToUse());
if (state_manager->isSecure())
{
@ -506,6 +506,7 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
nuraft::raft_server::limits raft_limits;
raft_limits.reconnect_limit_ = getValueOrMaxInt32AndLogWarning(coordination_settings->raft_limits_reconnect_limit, "raft_limits_reconnect_limit", log);
raft_limits.response_limit_ = getValueOrMaxInt32AndLogWarning(coordination_settings->raft_limits_response_limit, "response_limit", log);
raft_instance->set_raft_limits(raft_limits);
raft_instance->start_server(init_options.skip_initial_election_timeout_);
@ -1079,7 +1080,7 @@ ClusterUpdateActions KeeperServer::getRaftConfigurationDiff(const Poco::Util::Ab
void KeeperServer::applyConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action)
{
std::lock_guard _{server_write_mutex};
std::unique_lock server_write_lock{server_write_mutex};
if (is_recovering) return;
constexpr auto sleep_time = 500ms;
@ -1090,7 +1091,9 @@ void KeeperServer::applyConfigUpdateWithReconfigDisabled(const ClusterUpdateActi
auto backoff_on_refusal = [&](size_t i)
{
LOG_INFO(log, "Update was not accepted (try {}), backing off for {}", i + 1, sleep_time * (i + 1));
server_write_lock.unlock();
std::this_thread::sleep_for(sleep_time * (i + 1));
server_write_lock.lock();
};
const auto & coordination_settings = keeper_context->getCoordinationSettings();

View File

@ -58,36 +58,4 @@ void BlockInfo::read(ReadBuffer & in)
}
}
void BlockMissingValues::setBit(size_t column_idx, size_t row_idx)
{
RowsBitMask & mask = rows_mask_by_column_id[column_idx];
mask.resize(row_idx + 1);
mask[row_idx] = true;
}
void BlockMissingValues::setBits(size_t column_idx, size_t rows)
{
RowsBitMask & mask = rows_mask_by_column_id[column_idx];
mask.resize(rows);
std::fill(mask.begin(), mask.end(), true);
}
const BlockMissingValues::RowsBitMask & BlockMissingValues::getDefaultsBitmask(size_t column_idx) const
{
static RowsBitMask none;
auto it = rows_mask_by_column_id.find(column_idx);
if (it != rows_mask_by_column_id.end())
return it->second;
return none;
}
bool BlockMissingValues::hasDefaultBits(size_t column_idx) const
{
auto it = rows_mask_by_column_id.find(column_idx);
if (it == rows_mask_by_column_id.end())
return false;
const auto & col_mask = it->second;
return std::find(col_mask.begin(), col_mask.end(), true) != col_mask.end();
}
}

View File

@ -2,10 +2,6 @@
#include <base/types.h>
#include <unordered_map>
#include <vector>
namespace DB
{
@ -46,30 +42,4 @@ struct BlockInfo
void read(ReadBuffer & in);
};
/// Block extension to support delayed defaults. AddingDefaultsTransform uses it to replace missing values with column defaults.
class BlockMissingValues
{
public:
using RowsBitMask = std::vector<bool>; /// a bit per row for a column
/// Get mask for column, column_idx is index inside corresponding block
const RowsBitMask & getDefaultsBitmask(size_t column_idx) const;
/// Check that we have to replace default value at least in one of columns
bool hasDefaultBits(size_t column_idx) const;
/// Set bit for a specified row in a single column.
void setBit(size_t column_idx, size_t row_idx);
/// Set bits for all rows in a single column.
void setBits(size_t column_idx, size_t rows);
bool empty() const { return rows_mask_by_column_id.empty(); }
size_t size() const { return rows_mask_by_column_id.size(); }
void clear() { rows_mask_by_column_id.clear(); }
private:
using RowsMaskByColumnId = std::unordered_map<size_t, RowsBitMask>;
/// If rows_mask_by_column_id[column_id][row_id] is true related value in Block should be replaced with column default.
/// It could contain less columns and rows then related block.
RowsMaskByColumnId rows_mask_by_column_id;
};
}

View File

@ -0,0 +1,53 @@
#include <Core/BlockMissingValues.h>
namespace DB
{
void BlockMissingValues::setBit(size_t column_idx, size_t row_idx)
{
RowsBitMask & mask = rows_mask_by_column_id[column_idx];
mask.resize(row_idx + 1);
mask.set(row_idx, true);
}
void BlockMissingValues::setBits(size_t column_idx, size_t rows)
{
auto & mask = rows_mask_by_column_id[column_idx];
mask.set(0, std::min(mask.size(), rows), true);
mask.resize(rows, true);
}
const BlockMissingValues::RowsBitMask & BlockMissingValues::getDefaultsBitmask(size_t column_idx) const
{
return rows_mask_by_column_id[column_idx];
}
bool BlockMissingValues::hasDefaultBits(size_t column_idx) const
{
/// It is correct because we resize bitmask only when set a bit.
return !rows_mask_by_column_id[column_idx].empty();
}
void BlockMissingValues::clear()
{
for (auto & mask : rows_mask_by_column_id)
mask.clear();
}
bool BlockMissingValues::empty() const
{
return std::ranges::all_of(rows_mask_by_column_id, [&](const auto & mask)
{
return mask.empty();
});
}
size_t BlockMissingValues::size() const
{
size_t res = 0;
for (const auto & mask : rows_mask_by_column_id)
res += !mask.empty();
return res;
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <vector>
#include <boost/dynamic_bitset.hpp>
#include <Processors/Chunk.h>
namespace DB
{
/// Block extension to support delayed defaults.
/// AddingDefaultsTransform uses it to replace missing values with column defaults.
class BlockMissingValues
{
public:
using RowsBitMask = boost::dynamic_bitset<>; /// a bit per row for a column
explicit BlockMissingValues(size_t num_columns) : rows_mask_by_column_id(num_columns) {}
/// Get mask for column, column_idx is index inside corresponding block
const RowsBitMask & getDefaultsBitmask(size_t column_idx) const;
/// Check that we have to replace default value at least in one of columns
bool hasDefaultBits(size_t column_idx) const;
/// Set bit for a specified row in a single column.
void setBit(size_t column_idx, size_t row_idx);
/// Set bits for all rows in a single column.
void setBits(size_t column_idx, size_t rows);
void clear();
bool empty() const;
size_t size() const;
private:
using RowsMaskByColumnId = std::vector<RowsBitMask>;
/// If rows_mask_by_column_id[column_id][row_id] is true related value in Block should be replaced with column default.
/// It could contain less rows than related block.
RowsMaskByColumnId rows_mask_by_column_id;
};
/// The same as above but can be used as a chunk info.
class ChunkMissingValues : public BlockMissingValues, public ChunkInfoCloneable<ChunkMissingValues>
{
};
}

View File

@ -359,6 +359,8 @@ namespace ErrorCodes
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in the cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. A negative value means infinite. Zero means async mode.", 0) \
M(Milliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.", 0) \
M(Milliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.", 0) \
M(UInt64, min_free_disk_bytes_to_perform_insert, 0, "Minimum free disk space bytes to perform an insert.", 0) \
M(Double, min_free_disk_ratio_to_perform_insert, 0.0, "Minimum free disk space ratio to perform an insert.", 0) \
\
M(Bool, final, false, "Query with the FINAL modifier by default. If the engine does not support the FINAL, it does not have any effect. On queries with multiple tables, FINAL is applied only to those that support it. It also works on distributed tables", 0) \
\
@ -917,10 +919,11 @@ namespace ErrorCodes
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_dictionary_source_to_null, false, "Replace external dictionary sources to Null on restore. Useful for testing purposes", 0) \
M(Bool, create_if_not_exists, false, "Enable IF NOT EXISTS for CREATE statements by default", 0) \
M(Bool, enable_secure_identifiers, false, "If enabled, only allow secure identifiers which contain only underscore and alphanumeric characters", 0) \
M(Bool, mongodb_throw_on_unsupported_query, true, "If enabled, MongoDB tables will return an error when a MongoDB query cannot be built. Otherwise, ClickHouse reads the full table and processes it locally. This option does not apply to the legacy implementation or when 'allow_experimental_analyzer=0'.", 0) \
\
\
/* ###################################### */ \
/* ######## EXPERIMENTAL FEATURES ####### */ \
/* ###################################### */ \

View File

@ -72,8 +72,12 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"enable_parallel_replicas", false, false, "Parallel replicas with read tasks became the Beta tier feature."},
{"parallel_replicas_mode", "read_tasks", "read_tasks", "This setting was introduced as a part of making parallel replicas feature Beta"},
{"filesystem_cache_name", "", "", "Filesystem cache name to use for stateless table engines or data lakes"},
{"restore_replace_external_dictionary_source_to_null", false, false, "New setting."},
{"show_create_query_identifier_quoting_rule", "when_necessary", "when_necessary", "New setting."},
{"show_create_query_identifier_quoting_style", "Backticks", "Backticks", "New setting."},
{"enable_secure_identifiers", false, false, "New setting."},
{"min_free_disk_bytes_to_perform_insert", 0, 0, "New setting."},
{"min_free_disk_ratio_to_perform_insert", 0.0, 0.0, "New setting."},
}
},
{"24.9",
@ -91,6 +95,9 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"join_to_sort_minimum_perkey_rows", 0, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys"},
{"join_to_sort_maximum_table_rows", 0, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join"},
{"allow_experimental_join_right_table_sorting", false, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join"},
{"mongodb_throw_on_unsupported_query", false, true, "New setting."},
{"min_free_disk_bytes_to_perform_insert", 0, 0, "Maintain some free disk space bytes from inserts while still allowing for temporary writing."},
{"min_free_disk_ratio_to_perform_insert", 0.0, 0.0, "Maintain some free disk space bytes expressed as ratio to total disk space from inserts while still allowing for temporary writing."},
}
},
{"24.8",

View File

@ -1,7 +1,7 @@
#include <Core/SettingsFields.h>
#include <Core/Field.h>
#include <Core/AccurateComparison.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/logger_useful.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
@ -262,7 +262,7 @@ void SettingFieldMaxThreads::readBinary(ReadBuffer & in)
UInt64 SettingFieldMaxThreads::getAuto()
{
return getNumberOfPhysicalCPUCores();
return getNumberOfCPUCoresToUse();
}
namespace

View File

@ -4,7 +4,7 @@
#include <Poco/Environment.h>
#include <Poco/Platform.h>
#include <Common/VersionNumber.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/logger_useful.h>
@ -110,7 +110,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
};
UInt64 max_threads = get_current_value("max_threads").safeGet<UInt64>();
UInt64 max_threads_max_value = 256 * getNumberOfPhysicalCPUCores();
UInt64 max_threads_max_value = 256 * getNumberOfCPUCoresToUse();
if (max_threads > max_threads_max_value)
{
if (log)

View File

@ -12,7 +12,7 @@
#include <Common/ErrorCodes.h>
#include <Common/SymbolIndex.h>
#include <Common/StackTrace.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Core/ServerUUID.h>
#include <IO/WriteHelpers.h>
@ -54,7 +54,7 @@ void setExtras(bool anonymize, const std::string & server_data_path)
/// Sentry does not support 64-bit integers.
sentry_set_extra("total_ram", sentry_value_new_string(formatReadableSizeWithBinarySuffix(getMemoryAmountOrZero()).c_str()));
sentry_set_extra("physical_cpu_cores", sentry_value_new_int32(getNumberOfPhysicalCPUCores()));
sentry_set_extra("cpu_cores", sentry_value_new_int32(getNumberOfCPUCoresToUse()));
if (!server_data_path.empty())
sentry_set_extra("disk_free_space", sentry_value_new_string(formatReadableSizeWithBinarySuffix(fs::space(server_data_path).free).c_str()));

View File

@ -24,7 +24,8 @@
#if USE_SIMDJSON
# include <Common/JSONParsers/SimdJSONParser.h>
#elif USE_RAPIDJSON
#endif
#if USE_RAPIDJSON
# include <Common/JSONParsers/RapidJSONParser.h>
#else
# include <Common/JSONParsers/DummyJSONParser.h>
@ -36,6 +37,7 @@ namespace Setting
{
extern const SettingsBool allow_experimental_object_type;
extern const SettingsBool use_json_alias_for_old_object_type;
extern const SettingsBool allow_simdjson;
}
namespace ErrorCodes
@ -127,12 +129,18 @@ SerializationPtr DataTypeObject::doGetDefaultSerialization() const
{
case SchemaFormat::JSON:
#if USE_SIMDJSON
return std::make_shared<SerializationJSON<SimdJSONParser>>(
std::move(typed_path_serializations),
paths_to_skip,
path_regexps_to_skip,
buildJSONExtractTree<SimdJSONParser>(getPtr(), "JSON serialization"));
#elif USE_RAPIDJSON
auto context = CurrentThread::getQueryContext();
if (!context)
context = Context::getGlobalContextInstance();
if (context->getSettingsRef()[Setting::allow_simdjson])
return std::make_shared<SerializationJSON<SimdJSONParser>>(
std::move(typed_path_serializations),
paths_to_skip,
path_regexps_to_skip,
buildJSONExtractTree<SimdJSONParser>(getPtr(), "JSON serialization"));
#endif
#if USE_RAPIDJSON
return std::make_shared<SerializationJSON<RapidJSONParser>>(
std::move(typed_path_serializations),
paths_to_skip,
@ -404,7 +412,7 @@ std::unique_ptr<ISerialization::SubstreamData> DataTypeObject::getDynamicSubcolu
else
{
res = std::make_unique<SubstreamData>(std::make_shared<SerializationDynamic>());
res->type = std::make_shared<DataTypeDynamic>();
res->type = std::make_shared<DataTypeDynamic>(max_dynamic_types);
}
/// If column was provided, we should create a column for requested subcolumn.

View File

@ -11,20 +11,6 @@
using namespace DB;
namespace
{
bool withFileCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache;
}
bool withPageCache(const ReadSettings & settings, bool with_file_cache)
{
return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache;
}
}
namespace DB
{
namespace ErrorCodes
@ -35,7 +21,7 @@ namespace ErrorCodes
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!withFileCache(settings))
if (!settings.enable_filesystem_cache)
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
@ -45,7 +31,6 @@ size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
@ -54,12 +39,10 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_path_prefix(cache_path_prefix_)
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::getQueryId())
, use_external_buffer(use_external_buffer_)
, with_file_cache(withFileCache(settings))
, with_page_cache(withPageCache(settings, with_file_cache))
, with_file_cache(settings.enable_filesystem_cache)
, log(getLogger("ReadBufferFromRemoteFSGather"))
{
if (!blobs_to_read.empty())
@ -74,47 +57,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
}
current_object = object;
const auto & object_path = object.remote_path;
std::unique_ptr<ReadBufferFromFileBase> buf;
if (with_file_cache)
{
if (settings.remote_fs_cache->isInitialized())
{
auto cache_key = settings.filecache_key.has_value() ? FileCacheKey::fromPath(object_path) : *settings.filecache_key;
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
object_path,
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
/* read_until_position */std::nullopt,
cache_log);
}
else
{
settings.remote_fs_cache->throwInitExceptionIfNeeded();
}
}
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
if (with_page_cache && !buf)
{
auto inner = read_buffer_creator(/* restricted_seek */false, object);
auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_path };
buf = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, settings.page_cache, std::move(inner), settings);
}
if (!buf)
buf = read_buffer_creator(/* restricted_seek */true, object);
auto buf = read_buffer_creator(/* restricted_seek */true, object);
if (read_until_position > start_offset && read_until_position < start_offset + object.bytes_size)
buf->setReadUntilPosition(read_until_position - start_offset);

View File

@ -26,7 +26,6 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_);
@ -71,12 +70,10 @@ private:
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::string cache_path_prefix;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
const bool use_external_buffer;
const bool with_file_cache;
const bool with_page_cache;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;

View File

@ -210,63 +210,14 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObject( /// NOLI
auto settings_ptr = settings.get();
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(), object.remote_path, patchSettings(read_settings), settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries);
}
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(),
object_.remote_path,
disk_read_settings,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
/* use_external_buffer */true,
restricted_seek);
};
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
client.get(),
object.remote_path,
patchSettings(read_settings),
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
read_settings.remote_read_buffer_use_external_buffer,
read_settings.remote_read_buffer_restrict_seek,
/* read_until_position */0);
}
/// Open the file for write and return WriteBufferFromFileBase object.

View File

@ -51,12 +51,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -48,9 +48,7 @@ CachedObjectStorage::generateObjectKeyPrefixForDirectoryPath(const std::string &
ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
ReadSettings modified_settings{read_settings};
modified_settings.remote_fs_cache = cache;
return object_storage->patchSettings(modified_settings);
return object_storage->patchSettings(read_settings);
}
void CachedObjectStorage::startup()
@ -63,21 +61,45 @@ bool CachedObjectStorage::exists(const StoredObject & object) const
return object_storage->exists(object);
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
return object_storage->readObjects(objects, patchSettings(read_settings), read_hint, file_size);
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (read_settings.enable_filesystem_cache)
{
if (cache->isInitialized())
{
auto cache_key = cache->createKeyForPath(object.remote_path);
auto global_context = Context::getGlobalContextInstance();
auto modified_read_settings = read_settings.withNestedBuffer();
auto read_buffer_creator = [=, this]()
{
return object_storage->readObject(object, patchSettings(read_settings), read_hint, file_size);
};
return std::make_unique<CachedOnDiskReadBufferFromFile>(
object.remote_path,
cache_key,
cache,
FileCache::getCommonUser(),
read_buffer_creator,
modified_read_settings,
std::string(CurrentThread::getQueryId()),
object.bytes_size,
/* allow_seeks */!read_settings.remote_read_buffer_restrict_seek,
/* use_external_buffer */read_settings.remote_read_buffer_use_external_buffer,
/* read_until_position */std::nullopt,
global_context->getFilesystemCacheLog());
}
else
{
cache->throwInitExceptionIfNeeded();
}
}
return object_storage->readObject(object, patchSettings(read_settings), read_hint, file_size);
}

View File

@ -37,12 +37,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -11,6 +11,9 @@
#include <Common/CurrentMetrics.h>
#include <Common/Scheduler/IResourceManager.h>
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <IO/CachedInMemoryReadBufferFromFile.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
#include <Disks/FakeDiskTransaction.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -496,16 +499,60 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
std::optional<size_t> file_size) const
{
const auto storage_objects = metadata_storage->getStorageObjects(path);
auto global_context = Context::getGlobalContextInstance();
const bool file_can_be_empty = !file_size.has_value() || *file_size == 0;
if (storage_objects.empty() && file_can_be_empty)
return std::make_unique<ReadBufferFromEmptyFile>();
return object_storage->readObjects(
auto read_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName());
/// We wrap read buffer from object storage (read_buf = object_storage->readObject())
/// inside ReadBufferFromRemoteFSGather, so add nested buffer setting.
read_settings = read_settings.withNestedBuffer();
auto read_buffer_creator =
[this, read_settings, read_hint, file_size]
(bool restricted_seek, const StoredObject & object_) mutable -> std::unique_ptr<ReadBufferFromFileBase>
{
read_settings.remote_read_buffer_restrict_seek = restricted_seek;
auto impl = object_storage->readObject(object_, read_settings, read_hint, file_size);
if ((!object_storage->supportsCache() || !read_settings.enable_filesystem_cache)
&& read_settings.page_cache && read_settings.use_page_cache_for_disks_without_file_cache)
{
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
auto cache_path_prefix = fmt::format("{}:", magic_enum::enum_name(object_storage->getType()));
const auto object_namespace = object_storage->getObjectsNamespace();
if (!object_namespace.empty())
cache_path_prefix += object_namespace + "/";
const auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_.remote_path };
impl = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, read_settings.page_cache, std::move(impl), read_settings);
}
return impl;
};
const bool use_async_buffer = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
storage_objects,
updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()),
read_hint,
file_size);
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */use_async_buffer);
if (use_async_buffer)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
return impl;
}
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(

View File

@ -82,28 +82,12 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObject( /// NOLIN
initializeHDFSFS();
auto path = extractObjectKeyFromURL(object);
return std::make_unique<ReadBufferFromHDFS>(
fs::path(url_without_path) / "", fs::path(data_directory) / path, config, patchSettings(read_settings));
}
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
initializeHDFSFS();
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
auto path = extractObjectKeyFromURL(object_);
return std::make_unique<ReadBufferFromHDFS>(
fs::path(url_without_path) / "", fs::path(data_directory) / path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "hdfs:", disk_read_settings, nullptr, /* use_external_buffer */false);
fs::path(url_without_path) / "",
fs::path(data_directory) / path,
config,
patchSettings(read_settings),
/* read_until_position */0,
read_settings.remote_read_buffer_use_external_buffer);
}
std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOLINT

View File

@ -69,12 +69,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -150,13 +150,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Read multiple objects with common prefix
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -40,47 +40,12 @@ bool LocalObjectStorage::exists(const StoredObject & object) const
return fs::exists(object.remote_path);
}
std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
{ return std::make_unique<ReadBufferFromFile>(object.remote_path); };
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"file:",
modified_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */ false);
}
ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
if (!read_settings.enable_filesystem_cache)
return IObjectStorage::patchSettings(read_settings);
auto modified_settings{read_settings};
/// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used.
switch (modified_settings.local_fs_method)
{
case LocalFSReadMethod::pread_threadpool:
case LocalFSReadMethod::pread_fake_async:
{
modified_settings.local_fs_method = LocalFSReadMethod::pread;
LOG_INFO(log, "Changing local filesystem read method to `pread`");
break;
}
default:
{
break;
}
}
/// Other options might break assertions in AsynchronousBoundedReadBuffer.
modified_settings.local_fs_method = LocalFSReadMethod::pread;
modified_settings.direct_io_threshold = 0; /// Disable.
return IObjectStorage::patchSettings(modified_settings);
}

View File

@ -34,12 +34,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -176,65 +176,6 @@ bool S3ObjectStorage::exists(const StoredObject & object) const
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {});
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto settings_ptr = s3_settings.get();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
uri.bucket,
object_.remote_path,
uri.version_id,
settings_ptr->request_settings,
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
/* read_until_position */0,
restricted_seek);
};
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
@ -248,7 +189,12 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
object.remote_path,
uri.version_id,
settings_ptr->request_settings,
patchSettings(read_settings));
patchSettings(read_settings),
read_settings.remote_read_buffer_use_external_buffer,
/* offset */0,
/* read_until_position */0,
read_settings.remote_read_buffer_restrict_seek,
object.bytes_size ? std::optional<size_t>(object.bytes_size) : std::nullopt);
}
std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLINT

View File

@ -89,12 +89,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -233,69 +233,18 @@ WebObjectStorage::FileDataPtr WebObjectStorage::tryGetFileInfo(const String & pa
}
}
std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (objects.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "WebObjectStorage support read only from single object");
return readObject(objects[0], read_settings, read_hint, file_size);
}
std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
size_t object_size = object.bytes_size;
auto read_buffer_creator =
[this, read_settings, object_size]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object_.remote_path,
getContext(),
object_size,
read_settings,
/* use_external_buffer */true);
};
auto global_context = Context::getGlobalContextInstance();
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object.remote_path,
getContext(),
object.bytes_size,
read_settings,
read_settings.remote_read_buffer_use_external_buffer);
}
void WebObjectStorage::throwNotAllowed()

View File

@ -39,12 +39,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -4,6 +4,7 @@
#include <Formats/MarkInCompressedFile.h>
#include <Common/PODArray.h>
#include <Core/Block.h>
#include <Core/BlockMissingValues.h>
namespace DB
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Block.h>
#include <Core/BlockMissingValues.h>
namespace DB
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <base/arithmeticOverflow.h>
#include <base/types.h>
#include <Core/DecimalFunctions.h>
#include <Common/Exception.h>
@ -178,7 +179,7 @@ struct ToStartOfDayImpl
}
static Int64 executeExtendedResult(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.fromDayNum(ExtendedDayNum(d)) * DecimalUtils::scaleMultiplier<DateTime64>(DataTypeDateTime64::default_scale);
return common::mulIgnoreOverflow(time_zone.fromDayNum(ExtendedDayNum(d)), DecimalUtils::scaleMultiplier<DateTime64>(DataTypeDateTime64::default_scale));
}
using FactorTransform = ZeroTransform;
@ -1980,22 +1981,19 @@ struct ToRelativeSubsecondNumImpl
return t.value;
if (scale > scale_multiplier)
return t.value / (scale / scale_multiplier);
return static_cast<UInt128>(t.value) * static_cast<UInt128>((scale_multiplier / scale));
/// Casting ^^: All integers are Int64, yet if t.value is big enough the multiplication can still
/// overflow which is UB. This place is too low-level and generic to check if t.value is sane.
/// Therefore just let it overflow safely and don't bother further.
return common::mulIgnoreOverflow(t.value, scale_multiplier / scale);
}
static Int64 execute(UInt32 t, const DateLUTImpl &)
{
return t * scale_multiplier;
return common::mulIgnoreOverflow(static_cast<Int64>(t), scale_multiplier);
}
static Int64 execute(Int32 d, const DateLUTImpl & time_zone)
{
return static_cast<Int64>(time_zone.fromDayNum(ExtendedDayNum(d))) * scale_multiplier;
return common::mulIgnoreOverflow(static_cast<Int64>(time_zone.fromDayNum(ExtendedDayNum(d))), scale_multiplier);
}
static Int64 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return static_cast<Int64>(time_zone.fromDayNum(DayNum(d)) * scale_multiplier);
return common::mulIgnoreOverflow(static_cast<Int64>(time_zone.fromDayNum(DayNum(d))), scale_multiplier);
}
using FactorTransform = ZeroTransform;

View File

@ -104,7 +104,7 @@ struct ArrayAggregateImpl
static DataTypePtr getReturnType(const DataTypePtr & expression_return, const DataTypePtr & /*array_element*/)
{
if (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
if constexpr (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
{
return expression_return;
}
@ -152,9 +152,62 @@ struct ArrayAggregateImpl
return result;
}
template <AggregateOperation op = aggregate_operation>
requires(op == AggregateOperation::min || op == AggregateOperation::max)
static void executeMinOrMax(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr)
{
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(&*mapped);
if (const_column)
{
MutableColumnPtr res_column = const_column->getDataColumn().cloneEmpty();
res_column->insertMany(const_column->getField(), offsets.size());
res_ptr = std::move(res_column);
return;
}
MutableColumnPtr res_column = mapped->cloneEmpty();
static constexpr int nan_null_direction_hint = aggregate_operation == AggregateOperation::min ? 1 : -1;
/// TODO: Introduce row_begin and row_end to getPermutation or an equivalent function to use that instead
/// (same use case as SingleValueDataBase::getSmallestIndex)
UInt64 start_of_array = 0;
for (auto end_of_array : offsets)
{
/// Array is empty
if (start_of_array == end_of_array)
{
res_column->insertDefault();
continue;
}
UInt64 index = start_of_array;
for (UInt64 i = index + 1; i < end_of_array; i++)
{
if constexpr (aggregate_operation == AggregateOperation::min)
{
if ((mapped->compareAt(i, index, *mapped, nan_null_direction_hint) < 0))
index = i;
}
else
{
if ((mapped->compareAt(i, index, *mapped, nan_null_direction_hint) > 0))
index = i;
}
}
res_column->insertFrom(*mapped, index);
start_of_array = end_of_array;
}
chassert(res_column->size() == offsets.size());
res_ptr = std::move(res_column);
}
template <typename Element>
static NO_SANITIZE_UNDEFINED bool executeType(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr)
{
/// Min and Max are implemented in a different function
static_assert(aggregate_operation != AggregateOperation::min && aggregate_operation != AggregateOperation::max);
using ResultType = ArrayAggregateResult<Element, aggregate_operation>;
using ColVecType = ColumnVectorOrDecimal<Element>;
using ColVecResultType = ColumnVectorOrDecimal<ResultType>;
@ -197,11 +250,6 @@ struct ArrayAggregateImpl
/// Just multiply the value by array size.
res[i] = x * static_cast<ResultType>(array_size);
}
else if constexpr (aggregate_operation == AggregateOperation::min ||
aggregate_operation == AggregateOperation::max)
{
res[i] = x;
}
else if constexpr (aggregate_operation == AggregateOperation::average)
{
if constexpr (is_decimal<Element>)
@ -292,20 +340,6 @@ struct ArrayAggregateImpl
{
aggregate_value += element;
}
else if constexpr (aggregate_operation == AggregateOperation::min)
{
if (element < aggregate_value)
{
aggregate_value = element;
}
}
else if constexpr (aggregate_operation == AggregateOperation::max)
{
if (element > aggregate_value)
{
aggregate_value = element;
}
}
else if constexpr (aggregate_operation == AggregateOperation::product)
{
if constexpr (is_decimal<Element>)
@ -360,74 +394,41 @@ struct ArrayAggregateImpl
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
if constexpr (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
{
MutableColumnPtr res;
const auto & column = array.getDataPtr();
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(&*column);
if (const_column)
{
res = const_column->getDataColumn().cloneEmpty();
}
else
{
res = column->cloneEmpty();
}
const IColumn::Offsets & offsets = array.getOffsets();
size_t pos = 0;
for (const auto & offset : offsets)
{
if (offset == pos)
{
res->insertDefault();
continue;
}
size_t current_max_or_min_index = pos;
++pos;
for (; pos < offset; ++pos)
{
int compare_result = column->compareAt(pos, current_max_or_min_index, *column, 1);
if (aggregate_operation == AggregateOperation::max && compare_result > 0)
{
current_max_or_min_index = pos;
}
else if (aggregate_operation == AggregateOperation::min && compare_result < 0)
{
current_max_or_min_index = pos;
}
}
res->insert((*column)[current_max_or_min_index]);
}
return res;
}
const IColumn::Offsets & offsets = array.getOffsets();
ColumnPtr res;
if (executeType<UInt8>(mapped, offsets, res) ||
executeType<UInt16>(mapped, offsets, res) ||
executeType<UInt32>(mapped, offsets, res) ||
executeType<UInt64>(mapped, offsets, res) ||
executeType<UInt128>(mapped, offsets, res) ||
executeType<UInt256>(mapped, offsets, res) ||
executeType<Int8>(mapped, offsets, res) ||
executeType<Int16>(mapped, offsets, res) ||
executeType<Int32>(mapped, offsets, res) ||
executeType<Int64>(mapped, offsets, res) ||
executeType<Int128>(mapped, offsets, res) ||
executeType<Int256>(mapped, offsets, res) ||
executeType<Float32>(mapped, offsets, res) ||
executeType<Float64>(mapped, offsets, res) ||
executeType<Decimal32>(mapped, offsets, res) ||
executeType<Decimal64>(mapped, offsets, res) ||
executeType<Decimal128>(mapped, offsets, res) ||
executeType<Decimal256>(mapped, offsets, res) ||
executeType<DateTime64>(mapped, offsets, res))
if constexpr (aggregate_operation == AggregateOperation::min || aggregate_operation == AggregateOperation::max)
{
executeMinOrMax(mapped, offsets, res);
return res;
}
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected column for arraySum: {}", mapped->getName());
{
if (executeType<UInt8>(mapped, offsets, res) ||
executeType<UInt16>(mapped, offsets, res) ||
executeType<UInt32>(mapped, offsets, res) ||
executeType<UInt64>(mapped, offsets, res) ||
executeType<UInt128>(mapped, offsets, res) ||
executeType<UInt256>(mapped, offsets, res) ||
executeType<Int8>(mapped, offsets, res) ||
executeType<Int16>(mapped, offsets, res) ||
executeType<Int32>(mapped, offsets, res) ||
executeType<Int64>(mapped, offsets, res) ||
executeType<Int128>(mapped, offsets, res) ||
executeType<Int256>(mapped, offsets, res) ||
executeType<Float32>(mapped, offsets, res) ||
executeType<Float64>(mapped, offsets, res) ||
executeType<Decimal32>(mapped, offsets, res) ||
executeType<Decimal64>(mapped, offsets, res) ||
executeType<Decimal128>(mapped, offsets, res) ||
executeType<Decimal256>(mapped, offsets, res) ||
executeType<DateTime64>(mapped, offsets, res))
{
return res;
}
}
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected column for arraySum: {}", mapped->getName());
}
};

View File

@ -226,8 +226,17 @@ public:
if (overload == Overload::Origin)
origin_column = arguments[2];
const size_t time_zone_arg_num = (overload == Overload::Default) ? 2 : 3;
const auto & time_zone = extractTimeZoneFromFunctionArguments(arguments, time_zone_arg_num, 0);
const DateLUTImpl * time_zone_tmp;
if (isDateTimeOrDateTime64(time_column.type) || isDateTimeOrDateTime64(result_type))
{
const size_t time_zone_arg_num = (overload == Overload::Default) ? 2 : 3;
time_zone_tmp = &extractTimeZoneFromFunctionArguments(arguments, time_zone_arg_num, 0);
}
else /// As we convert date to datetime and perform calculation, we don't need to take the timezone into account, so we set it to default
time_zone_tmp = &DateLUT::instance("UTC");
const DateLUTImpl & time_zone = *time_zone_tmp;
ColumnPtr result_column;
if (isDate(result_type))

View File

@ -118,8 +118,8 @@ struct ReadSettings
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
FileCachePtr remote_fs_cache;
std::optional<FileCacheKey> filecache_key;
bool remote_read_buffer_restrict_seek = false;
bool remote_read_buffer_use_external_buffer = false;
/// Bandwidth throttler to use during reading
ThrottlerPtr remote_throttler;
@ -141,6 +141,14 @@ struct ReadSettings
res.prefetch_buffer_size = std::min(std::max(1ul, file_size), prefetch_buffer_size);
return res;
}
ReadSettings withNestedBuffer() const
{
ReadSettings res = *this;
res.remote_read_buffer_restrict_seek = true;
res.remote_read_buffer_use_external_buffer = true;
return res;
}
};
ReadSettings getReadSettings();

View File

@ -136,6 +136,8 @@ void WriteBufferFromPocoSocket::nextImpl()
SCOPE_EXIT({
ProfileEvents::increment(ProfileEvents::NetworkSendElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::NetworkSendBytes, bytes_written);
if (write_event != ProfileEvents::end())
ProfileEvents::increment(write_event, bytes_written);
});
while (bytes_written < offset())

View File

@ -121,7 +121,10 @@ private:
{
if (select.recursive_with)
for (const auto & child : select.with()->children)
with_aliases.insert(child->as<ASTWithElement>()->name);
{
if (typeid_cast<ASTWithElement *>(child.get()))
with_aliases.insert(child->as<ASTWithElement>()->name);
}
if (select.tables())
tryVisit<ASTTablesInSelectQuery>(select.refTables());

View File

@ -10,7 +10,7 @@
#include <Common/SensitiveDataMasker.h>
#include <Common/Macros.h>
#include <Common/EventNotifier.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/Throttler.h>
@ -3377,10 +3377,13 @@ size_t Context::getPrefetchThreadpoolSize() const
ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const
{
callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] {
callOnce(
shared->build_vector_similarity_index_threadpool_initialized,
[&]
{
size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0
? shared->server_settings.max_build_vector_similarity_index_thread_pool_size
: getNumberOfPhysicalCPUCores();
: getNumberOfCPUCoresToUse();
shared->build_vector_similarity_index_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::BuildVectorSimilarityIndexThreads,
CurrentMetrics::BuildVectorSimilarityIndexThreadsActive,

View File

@ -25,6 +25,8 @@ ColumnsDescription FilesystemCacheLogElement::getColumnsDescription()
std::make_shared<DataTypeNumber<UInt64>>(),
};
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return ColumnsDescription
{
{"hostname", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "Hostname"},
@ -39,7 +41,7 @@ ColumnsDescription FilesystemCacheLogElement::getColumnsDescription()
{"size", std::make_shared<DataTypeUInt64>(), "Read size"},
{"read_type", std::make_shared<DataTypeString>(), "Read type: READ_FROM_CACHE, READ_FROM_FS_AND_DOWNLOADED_TO_CACHE, READ_FROM_FS_BYPASSING_CACHE"},
{"read_from_cache_attempted", std::make_shared<DataTypeUInt8>(), "Whether reading from cache was attempted"},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>()), "Profile events collected while reading this file segment"},
{"ProfileEvents", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeUInt64>()), "Profile events collected while reading this file segment"},
{"read_buffer_id", std::make_shared<DataTypeString>(), "Internal implementation read buffer id"},
};
}

View File

@ -16,6 +16,10 @@ bool less(const Field & lhs, const Field & rhs, int direction)
bool equals(const Field & lhs, const Field & rhs)
{
/// This will treat NaNs as equal
if (lhs.getType() == rhs.getType())
return lhs == rhs;
return applyVisitor(FieldVisitorAccurateEquals(), lhs, rhs);
}

View File

@ -76,6 +76,8 @@
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/NormalizeAndEvaluateConstantsVisitor.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Compression/CompressionFactory.h>
#include <Interpreters/InterpreterDropQuery.h>
@ -137,6 +139,7 @@ namespace Setting
extern const SettingsUInt64 max_parser_depth;
extern const SettingsBool restore_replace_external_engines_to_null;
extern const SettingsBool restore_replace_external_table_functions_to_null;
extern const SettingsBool restore_replace_external_dictionary_source_to_null;
}
namespace ErrorCodes
@ -866,6 +869,26 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
{
properties.indices = as_storage_metadata->getSecondaryIndices();
properties.projections = as_storage_metadata->getProjections().clone();
/// CREATE TABLE AS should copy PRIMARY KEY, ORDER BY, and similar clauses.
/// Note: only supports the source table engine is using the new syntax.
if (const auto * merge_tree_data = dynamic_cast<const MergeTreeData *>(as_storage.get()))
{
if (merge_tree_data->format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
if (!create.storage->primary_key && as_storage_metadata->isPrimaryKeyDefined() && as_storage_metadata->hasPrimaryKey())
create.storage->set(create.storage->primary_key, as_storage_metadata->getPrimaryKeyAST()->clone());
if (!create.storage->partition_by && as_storage_metadata->isPartitionKeyDefined() && as_storage_metadata->hasPartitionKey())
create.storage->set(create.storage->partition_by, as_storage_metadata->getPartitionKeyAST()->clone());
if (!create.storage->order_by && as_storage_metadata->isSortingKeyDefined() && as_storage_metadata->hasSortingKey())
create.storage->set(create.storage->order_by, as_storage_metadata->getSortingKeyAST()->clone());
if (!create.storage->sample_by && as_storage_metadata->isSamplingKeyDefined() && as_storage_metadata->hasSamplingKey())
create.storage->set(create.storage->sample_by, as_storage_metadata->getSamplingKeyAST()->clone());
}
}
}
else
{
@ -1155,6 +1178,22 @@ namespace
storage.set(storage.engine, engine_ast);
}
void setNullDictionarySourceIfExternal(ASTCreateQuery & create_query)
{
ASTDictionary & dict = *create_query.dictionary;
if (Poco::toLower(dict.source->name) == "clickhouse")
{
auto config = getDictionaryConfigurationFromAST(create_query, Context::getGlobalContextInstance());
auto info = getInfoIfClickHouseDictionarySource(config, Context::getGlobalContextInstance());
if (info && info->is_local)
return;
}
auto source_ast = std::make_shared<ASTFunctionWithKeyValueArguments>();
source_ast->name = "null";
source_ast->elements = std::make_shared<ASTExpressionList>();
source_ast->children.push_back(source_ast->elements);
dict.set(dict.source, source_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
@ -1181,6 +1220,9 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
return;
}
if (create.is_dictionary && getContext()->getSettingsRef()[Setting::restore_replace_external_dictionary_source_to_null])
setNullDictionarySourceIfExternal(create);
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;

View File

@ -3,7 +3,7 @@
#include <Common/DNSResolver.h>
#include <Common/ActionLock.h>
#include <Common/typeid_cast.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfCPUCoresToUse.h>
#include <Common/SymbolIndex.h>
#include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h>
@ -942,7 +942,7 @@ void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context)
if (replica_names.empty())
return;
size_t threads = std::min(static_cast<size_t>(getNumberOfPhysicalCPUCores()), replica_names.size());
size_t threads = std::min(static_cast<size_t>(getNumberOfCPUCoresToUse()), replica_names.size());
LOG_DEBUG(log, "Will restart {} replicas using {} threads", replica_names.size(), threads);
ThreadPool pool(CurrentMetrics::RestartReplicaThreads, CurrentMetrics::RestartReplicaThreadsActive, CurrentMetrics::RestartReplicaThreadsScheduled, threads);

View File

@ -513,12 +513,6 @@ static void validateUpdateColumns(
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no column {} in table", backQuote(column_name));
}
}
else if (storage_columns.getColumn(GetColumnsOptions::Ordinary, column_name).type->hasDynamicSubcolumns())
{
throw Exception(ErrorCodes::CANNOT_UPDATE_COLUMN,
"Cannot update column {} with type {}: updates of columns with dynamic subcolumns are not supported",
backQuote(column_name), storage_columns.getColumn(GetColumnsOptions::Ordinary, column_name).type->getName());
}
}
}
@ -1365,6 +1359,21 @@ void MutationsInterpreter::validate()
}
}
const auto & storage_columns = source.getStorageSnapshot(metadata_snapshot, context)->metadata->getColumns();
for (const auto & command : commands)
{
for (const auto & [column_name, _] : command.column_to_update_expression)
{
auto column = storage_columns.tryGetColumn(GetColumnsOptions::Ordinary, column_name);
if (column && column->type->hasDynamicSubcolumns())
{
throw Exception(ErrorCodes::CANNOT_UPDATE_COLUMN,
"Cannot update column {} with type {}: updates of columns with dynamic subcolumns are not supported",
backQuote(column_name), storage_columns.getColumn(GetColumnsOptions::Ordinary, column_name).type->getName());
}
}
}
QueryPlan plan;
initQueryPlan(stages.front(), plan);

View File

@ -90,6 +90,8 @@ ColumnsDescription PartLogElement::getColumnsDescription()
}
);
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
ColumnsWithTypeAndName columns_with_type_and_name;
return ColumnsDescription
@ -142,7 +144,7 @@ ColumnsDescription PartLogElement::getColumnsDescription()
{"error", std::make_shared<DataTypeUInt16>(), "The error code of the occurred exception."},
{"exception", std::make_shared<DataTypeString>(), "Text message of the occurred error."},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>()), "All the profile events captured during this operation."},
{"ProfileEvents", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeUInt64>()), "All the profile events captured during this operation."},
};
}

View File

@ -33,6 +33,8 @@ ColumnsDescription QueryViewsLogElement::getColumnsDescription()
{"Live", static_cast<Int8>(ViewType::LIVE)},
{"Window", static_cast<Int8>(ViewType::WINDOW)}});
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return ColumnsDescription
{
{"hostname", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "Hostname of the server executing the query."},
@ -53,7 +55,7 @@ ColumnsDescription QueryViewsLogElement::getColumnsDescription()
{"written_rows", std::make_shared<DataTypeUInt64>(), "Number of written rows."},
{"written_bytes", std::make_shared<DataTypeUInt64>(), "Number of written bytes."},
{"peak_memory_usage", std::make_shared<DataTypeInt64>(), "The maximum difference between the amount of allocated and freed memory in context of this view."},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>()), "ProfileEvents that measure different metrics. The description of them could be found in the table system.events."},
{"ProfileEvents", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeUInt64>()), "ProfileEvents that measure different metrics. The description of them could be found in the table system.events."},
{"status", std::move(view_status_datatype), "Status of the view. Values: "
"'QueryStart' = 1 — Successful start the view execution. Should not appear, "

View File

@ -154,6 +154,7 @@ namespace Setting
extern const SettingsBool use_query_cache;
extern const SettingsBool wait_for_async_insert;
extern const SettingsSeconds wait_for_async_insert_timeout;
extern const SettingsBool enable_secure_identifiers;
}
namespace ErrorCodes
@ -997,6 +998,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
InterpreterSetQuery::applySettingsFromQuery(ast, context);
validateAnalyzerSettings(ast, settings[Setting::allow_experimental_analyzer]);
if (settings[Setting::enable_secure_identifiers])
{
WriteBufferFromOwnString buf;
IAST::FormatSettings enable_secure_identifiers_settings(buf, true);
enable_secure_identifiers_settings.enable_secure_identifiers = true;
ast->format(enable_secure_identifiers_settings);
}
if (auto * insert_query = ast->as<ASTInsertQuery>())
insert_query->tail = istr;

View File

@ -216,7 +216,8 @@ bool ASTAuthenticationData::hasSecretParts() const
auto auth_type = *type;
if ((auth_type == AuthenticationType::PLAINTEXT_PASSWORD)
|| (auth_type == AuthenticationType::SHA256_PASSWORD)
|| (auth_type == AuthenticationType::DOUBLE_SHA1_PASSWORD))
|| (auth_type == AuthenticationType::DOUBLE_SHA1_PASSWORD)
|| (auth_type == AuthenticationType::BCRYPT_PASSWORD))
return true;
return childrenHaveSecretParts();

View File

@ -8,6 +8,7 @@
#include <Poco/String.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/SipHash.h>
#include <algorithm>
namespace DB
{
@ -17,6 +18,7 @@ namespace ErrorCodes
extern const int TOO_BIG_AST;
extern const int TOO_DEEP_AST;
extern const int UNKNOWN_ELEMENT_IN_AST;
extern const int BAD_ARGUMENTS;
}
@ -220,6 +222,7 @@ String IAST::getColumnNameWithoutAlias() const
void IAST::FormatSettings::writeIdentifier(const String & name, bool ambiguous) const
{
checkIdentifier(name);
bool must_quote
= (identifier_quoting_rule == IdentifierQuotingRule::Always
|| (ambiguous && identifier_quoting_rule == IdentifierQuotingRule::WhenNecessary));
@ -260,6 +263,21 @@ void IAST::FormatSettings::writeIdentifier(const String & name, bool ambiguous)
}
}
void IAST::FormatSettings::checkIdentifier(const String & name) const
{
if (enable_secure_identifiers)
{
bool is_secure_identifier = std::all_of(name.begin(), name.end(), [](char ch) { return std::isalnum(ch) || ch == '_'; });
if (!is_secure_identifier)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Not a secure identifier: `{}`, a secure identifier must contain only underscore and alphanumeric characters",
name);
}
}
}
void IAST::dumpTree(WriteBuffer & ostr, size_t indent) const
{
String indent_str(indent, '-');

View File

@ -202,6 +202,7 @@ public:
char nl_or_ws; /// Newline or whitespace.
LiteralEscapingStyle literal_escaping_style;
bool print_pretty_type_names;
bool enable_secure_identifiers;
explicit FormatSettings(
WriteBuffer & ostr_,
@ -211,7 +212,8 @@ public:
IdentifierQuotingStyle identifier_quoting_style_ = IdentifierQuotingStyle::Backticks,
bool show_secrets_ = true,
LiteralEscapingStyle literal_escaping_style_ = LiteralEscapingStyle::Regular,
bool print_pretty_type_names_ = false)
bool print_pretty_type_names_ = false,
bool enable_secure_identifiers_ = false)
: ostr(ostr_)
, one_line(one_line_)
, hilite(hilite_)
@ -221,6 +223,7 @@ public:
, nl_or_ws(one_line ? ' ' : '\n')
, literal_escaping_style(literal_escaping_style_)
, print_pretty_type_names(print_pretty_type_names_)
, enable_secure_identifiers(enable_secure_identifiers_)
{
}
@ -234,10 +237,12 @@ public:
, nl_or_ws(other.nl_or_ws)
, literal_escaping_style(other.literal_escaping_style)
, print_pretty_type_names(other.print_pretty_type_names)
, enable_secure_identifiers(other.enable_secure_identifiers)
{
}
void writeIdentifier(const String & name, bool ambiguous) const;
void checkIdentifier(const String & name) const;
};
/// State. For example, a set of nodes can be remembered, which we already walk through.

View File

@ -178,22 +178,6 @@ void Chunk::append(const Chunk & chunk, size_t from, size_t length)
setColumns(std::move(mutable_columns), rows);
}
void ChunkMissingValues::setBit(size_t column_idx, size_t row_idx)
{
RowsBitMask & mask = rows_mask_by_column_id[column_idx];
mask.resize(row_idx + 1);
mask[row_idx] = true;
}
const ChunkMissingValues::RowsBitMask & ChunkMissingValues::getDefaultsBitmask(size_t column_idx) const
{
static RowsBitMask none;
auto it = rows_mask_by_column_id.find(column_idx);
if (it != rows_mask_by_column_id.end())
return it->second;
return none;
}
void convertToFullIfConst(Chunk & chunk)
{
size_t num_rows = chunk.getNumRows();

View File

@ -153,28 +153,6 @@ public:
using AsyncInsertInfoPtr = std::shared_ptr<AsyncInsertInfo>;
/// Extension to support delayed defaults. AddingDefaultsProcessor uses it to replace missing values with column defaults.
class ChunkMissingValues : public ChunkInfoCloneable<ChunkMissingValues>
{
public:
ChunkMissingValues(const ChunkMissingValues & other) = default;
using RowsBitMask = std::vector<bool>; /// a bit per row for a column
const RowsBitMask & getDefaultsBitmask(size_t column_idx) const;
void setBit(size_t column_idx, size_t row_idx);
bool empty() const { return rows_mask_by_column_id.empty(); }
size_t size() const { return rows_mask_by_column_id.size(); }
void clear() { rows_mask_by_column_id.clear(); }
private:
using RowsMaskByColumnId = std::unordered_map<size_t, RowsBitMask>;
/// If rows_mask_by_column_id[column_id][row_id] is true related value in Block should be replaced with column default.
/// It could contain less columns and rows then related block.
RowsMaskByColumnId rows_mask_by_column_id;
};
/// Converts all columns to full serialization in chunk.
/// It's needed, when you have to access to the internals of the column,
/// or when you need to perform operation with two columns

View File

@ -6,6 +6,7 @@
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/SourceWithKeyCondition.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Core/BlockMissingValues.h>
namespace DB
@ -43,11 +44,7 @@ public:
virtual void setReadBuffer(ReadBuffer & in_);
virtual void resetReadBuffer() { in = nullptr; }
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
return none;
}
virtual const BlockMissingValues * getMissingValues() const { return nullptr; }
/// Must be called from ParallelParsingInputFormat after readSuffix
ColumnMappingPtr getColumnMapping() const { return column_mapping; }

View File

@ -56,7 +56,10 @@ bool isParseError(int code)
}
IRowInputFormat::IRowInputFormat(Block header, ReadBuffer & in_, Params params_)
: IInputFormat(std::move(header), &in_), serializations(getPort().getHeader().getSerializations()), params(params_)
: IInputFormat(std::move(header), &in_)
, serializations(getPort().getHeader().getSerializations())
, params(params_)
, block_missing_values(getPort().getHeader().columns())
{
}

View File

@ -78,7 +78,7 @@ protected:
void logError();
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
const BlockMissingValues * getMissingValues() const override { return &block_missing_values; }
size_t getRowNum() const { return total_rows; }

View File

@ -24,7 +24,10 @@ namespace ErrorCodes
}
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IInputFormat(header_, &in_), stream{stream_}, format_settings(format_settings_)
: IInputFormat(header_, &in_)
, stream(stream_)
, block_missing_values(getPort().getHeader().columns())
, format_settings(format_settings_)
{
}
@ -108,9 +111,9 @@ void ArrowBlockInputFormat::resetParser()
block_missing_values.clear();
}
const BlockMissingValues & ArrowBlockInputFormat::getMissingValues() const
const BlockMissingValues * ArrowBlockInputFormat::getMissingValues() const
{
return block_missing_values;
return &block_missing_values;
}
static std::shared_ptr<arrow::RecordBatchReader> createStreamReader(ReadBuffer & in)

View File

@ -25,7 +25,7 @@ public:
String getName() const override { return "ArrowBlockInputFormat"; }
const BlockMissingValues & getMissingValues() const override;
const BlockMissingValues * getMissingValues() const override;
size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

View File

@ -9,6 +9,7 @@
#include <Core/Block.h>
#include <arrow/table.h>
#include <Formats/FormatSettings.h>
#include <Core/BlockMissingValues.h>
namespace DB
{

View File

@ -331,9 +331,6 @@ ConstantExpressionTemplate::TemplateStructure::TemplateStructure(LiteralsInfo &
for (size_t i = 0; i < replaced_literals.size(); ++i)
{
const LiteralInfo & info = replaced_literals[i];
if (info.literal->begin.value() < prev_end)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot replace literals");
while (prev_end < info.literal->begin.value())
{
tokens.emplace_back(prev_end->begin, prev_end->size());

View File

@ -84,6 +84,7 @@ JSONColumnsBlockInputFormatBase::JSONColumnsBlockInputFormatBase(
, fields(header_.getNamesAndTypes())
, serializations(header_.getSerializations())
, reader(std::move(reader_))
, block_missing_values(getPort().getHeader().columns())
{
name_to_index = getPort().getHeader().getNamesToIndexesMap();
}

View File

@ -52,7 +52,7 @@ public:
void setReadBuffer(ReadBuffer & in_) override;
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
const BlockMissingValues * getMissingValues() const override { return &block_missing_values; }
size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

View File

@ -23,7 +23,10 @@ public:
0,
settings,
settings.defaults_for_omitted_fields ? &block_missing_values : nullptr))
, header(header_) {}
, header(header_)
, block_missing_values(header.columns())
{
}
String getName() const override { return "Native"; }
@ -56,7 +59,7 @@ public:
IInputFormat::setReadBuffer(in_);
}
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
const BlockMissingValues * getMissingValues() const override { return &block_missing_values; }
size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

View File

@ -846,7 +846,10 @@ static void updateIncludeTypeIds(
}
NativeORCBlockInputFormat::NativeORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), &in_), format_settings(format_settings_), skip_stripes(format_settings.orc.skip_stripes)
: IInputFormat(std::move(header_), &in_)
, block_missing_values(getPort().getHeader().columns())
, format_settings(format_settings_)
, skip_stripes(format_settings.orc.skip_stripes)
{
}
@ -975,9 +978,9 @@ void NativeORCBlockInputFormat::resetParser()
block_missing_values.clear();
}
const BlockMissingValues & NativeORCBlockInputFormat::getMissingValues() const
const BlockMissingValues * NativeORCBlockInputFormat::getMissingValues() const
{
return block_missing_values;
return &block_missing_values;
}
NativeORCSchemaReader::NativeORCSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

Some files were not shown because too many files have changed in this diff Show More