Merge branch 'master' into keeper-linearizable-reads

This commit is contained in:
Antonio Andelic 2022-09-15 18:40:23 +02:00 committed by GitHub
commit fe78c063f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 185 additions and 42 deletions

View File

@ -61,7 +61,7 @@ function configure
cp -rv right/config left ||:
# Start a temporary server to rename the tables
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo all killed
set -m # Spawn temporary in its own process groups
@ -88,7 +88,7 @@ function configure
clickhouse-client --port $LEFT_SERVER_PORT --query "create database test" ||:
clickhouse-client --port $LEFT_SERVER_PORT --query "rename table datasets.hits_v1 to test.hits" ||:
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo all killed
# Make copies of the original db for both servers. Use hardlinks instead
@ -106,7 +106,7 @@ function configure
function restart
{
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo all killed
# Change the jemalloc settings here.
@ -1400,7 +1400,7 @@ case "$stage" in
while env kill -- -$watchdog_pid ; do sleep 1; done
# Stop the servers to free memory for the subsequent query analysis.
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo Servers stopped.
;&
"analyze_queries")

View File

@ -134,6 +134,13 @@ Example of configuration for versions later or equal to 22.8:
<max_size>10000000</max_size>
</cache>
</disks>
<policies>
<volumes>
<main>
<disk>cache</disk>
</main>
</volumes>
<policies>
</storage_configuration>
```
@ -151,6 +158,13 @@ Example of configuration for versions earlier than 22.8:
<data_cache_size>10000000</data_cache_size>
</s3>
</disks>
<policies>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
<policies>
</storage_configuration>
```

View File

@ -13,7 +13,7 @@ Creates a table from a file. This table function is similar to [url](../../sql-r
**Syntax**
``` sql
file(path, format, structure)
file(path [,format] [,structure])
```
**Parameters**

View File

@ -11,7 +11,7 @@ Provides table-like interface to select/insert files in [Amazon S3](https://aws.
**Syntax**
``` sql
s3(path, [aws_access_key_id, aws_secret_access_key,] format, structure, [compression])
s3(path [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**

View File

@ -10,7 +10,7 @@ Allows processing files from [Amazon S3](https://aws.amazon.com/s3/) in parallel
**Syntax**
``` sql
s3Cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure)
s3Cluster(cluster_name, source, [,access_key_id, secret_access_key] [,format] [,structure])
```
**Arguments**

View File

@ -13,7 +13,7 @@ sidebar_label: url
**Syntax**
``` sql
url(URL, format, structure)
url(URL [,format] [,structure])
```
**Parameters**

View File

@ -13,7 +13,7 @@ sidebar_label: file
**Синтаксис**
``` sql
file(path, format, structure)
file(path [,format] [,structure])
```
**Параметры**

View File

@ -11,7 +11,7 @@ sidebar_label: s3
**Синтаксис**
``` sql
s3(path, [aws_access_key_id, aws_secret_access_key,] format, structure, [compression])
s3(path [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Aргументы**

View File

@ -11,7 +11,7 @@ sidebar_label: s3Cluster
**Синтаксис**
``` sql
s3Cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure)
s3Cluster(cluster_name, source, [,access_key_id, secret_access_key] [,format] [,structure])
```
**Аргументы**

View File

@ -13,7 +13,7 @@ sidebar_label: url
**Синтаксис**
``` sql
url(URL, format, structure)
url(URL [,format] [,structure])
```
**Параметры**

View File

@ -481,7 +481,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, optimize_if_chain_to_multiif, false, "Replace if(cond1, then1, if(cond2, ...)) chains to multiIf. Currently it's not beneficial for numeric types.", 0) \
M(Bool, optimize_multiif_to_if, true, "Replace 'multiIf' with only one condition to 'if'.", 0) \
M(Bool, optimize_if_transform_strings_to_enum, false, "Replaces string-type arguments in If and Transform to enum. Disabled by default cause it could make inconsistent change in distributed query that would lead to its fail.", 0) \
M(Bool, optimize_monotonous_functions_in_order_by, true, "Replace monotonous function with its argument in ORDER BY", 0) \
M(Bool, optimize_monotonous_functions_in_order_by, false, "Replace monotonous function with its argument in ORDER BY", 0) \
M(Bool, optimize_functions_to_subcolumns, false, "Transform functions to subcolumns, if possible, to reduce amount of read data. E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null' ", 0) \
M(Bool, optimize_using_constraints, false, "Use constraints for query optimization", 0) \
M(Bool, optimize_substitute_columns, false, "Use constraints for column substitution", 0) \

View File

@ -143,9 +143,11 @@ void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size)
}
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getCacheReadBuffer(size_t offset) const
CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segment) const
{
auto path = cache->getPathInLocalCache(cache_key, offset, is_persistent);
/// Use is_persistent flag from in-memory state of the filesegment,
/// because it is consistent with what is written on disk.
auto path = file_segment.getPathInLocalCache();
ReadSettings local_read_settings{settings};
/// Do not allow to use asynchronous version of LocalFSReadMethod.
@ -237,8 +239,6 @@ bool CachedOnDiskReadBufferFromFile::canStartFromCache(size_t current_offset, co
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & file_segment)
{
auto range = file_segment->range();
auto download_state = file_segment->state();
LOG_TEST(log, "getReadBufferForFileSegment: {}", file_segment->getInfoForLog());
@ -247,7 +247,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
if (download_state == FileSegment::State::DOWNLOADED)
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
else
{
@ -280,7 +280,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
download_state = file_segment->wait();
@ -289,7 +289,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
case FileSegment::State::DOWNLOADED:
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
case FileSegment::State::EMPTY:
case FileSegment::State::PARTIALLY_DOWNLOADED:
@ -305,7 +305,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
auto downloader_id = file_segment->getOrSetDownloader();
@ -323,7 +323,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
read_type = ReadType::CACHED;
file_segment->resetDownloader();
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
if (file_segment->getCurrentWriteOffset() < file_offset_of_buffer_end)
@ -339,7 +339,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
LOG_TEST(log, "Predownload. File segment info: {}", file_segment->getInfoForLog());
chassert(file_offset_of_buffer_end > file_segment->getCurrentWriteOffset());
bytes_to_predownload = file_offset_of_buffer_end - file_segment->getCurrentWriteOffset();
chassert(bytes_to_predownload < range.size());
chassert(bytes_to_predownload < file_segment->range().size());
}
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
@ -354,7 +354,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
if (canStartFromCache(file_offset_of_buffer_end, *file_segment))
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
else
{

View File

@ -68,7 +68,7 @@ private:
ImplementationBufferPtr getReadBufferForFileSegment(FileSegmentPtr & file_segment);
ImplementationBufferPtr getCacheReadBuffer(size_t offset) const;
ImplementationBufferPtr getCacheReadBuffer(const FileSegment & file_segment) const;
std::optional<size_t> getLastNonDownloadedOffset() const;

View File

@ -13,7 +13,6 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
}
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
@ -131,9 +130,6 @@ DiskObjectStorageMetadata::DiskObjectStorageMetadata(
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
{
if (!object_storage_root_path.empty() && path.starts_with(object_storage_root_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected relative path");
total_size += size;
storage_objects.emplace_back(path, size);
}

View File

@ -61,18 +61,23 @@ public:
return host_fqdn_id;
}
std::string getQueueDir() const
{
return queue_dir;
}
void startup();
virtual void shutdown();
bool isCurrentlyActive() const { return initialized && !stop_flag; }
protected:
/// Returns cached ZooKeeper session (possibly expired).
ZooKeeperPtr tryGetZooKeeper() const;
/// If necessary, creates a new session and caches it.
ZooKeeperPtr getAndSetZooKeeper();
protected:
/// Iterates through queue tasks in ZooKeeper, runs execution of new tasks
void scheduleTasks(bool reinitialized);

View File

@ -235,8 +235,10 @@ static void insertNull(IColumn & column, DataTypePtr type)
assert_cast<ColumnNullable &>(column).insertDefault();
}
static void insertUUID(IColumn & column, DataTypePtr /*type*/, const char * value, size_t size)
static void insertUUID(IColumn & column, DataTypePtr type, const char * value, size_t size)
{
if (!isUUID(type))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack UUID into column with type {}.", type->getName());
ReadBufferFromMemory buf(value, size);
UUID uuid;
readBinaryBigEndian(uuid.toUnderType().items[0], buf);

View File

@ -205,9 +205,9 @@ static void fillStatusColumns(MutableColumns & res_columns, size_t & col,
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper();
fs::path ddl_zookeeper_path = context->getConfigRef().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
auto& ddl_worker = context->getDDLWorker();
fs::path ddl_zookeeper_path = ddl_worker.getQueueDir();
zkutil::ZooKeeperPtr zookeeper = ddl_worker.getAndSetZooKeeper();
Strings ddl_task_paths = zookeeper->getChildren(ddl_zookeeper_path);
GetResponseFutures ddl_task_futures;

View File

@ -291,7 +291,9 @@ def main():
logging.info("Will try to fetch cache for our build")
try:
get_ccache_if_not_exists(ccache_path, s3_helper, pr_info.number, TEMP_PATH)
get_ccache_if_not_exists(
ccache_path, s3_helper, pr_info.number, TEMP_PATH, pr_info.release_pr
)
except Exception as e:
# In case there are issues with ccache, remove the path and do not fail a build
logging.info("Failed to get ccache, building without it. Error: %s", e)

View File

@ -11,6 +11,7 @@ import requests # type: ignore
from compress_files import decompress_fast, compress_fast
from env_helper import S3_DOWNLOAD, S3_BUILDS_BUCKET
from s3_helper import S3Helper
DOWNLOAD_RETRIES_COUNT = 5
@ -57,12 +58,19 @@ def dowload_file_with_progress(url, path):
def get_ccache_if_not_exists(
path_to_ccache_dir, s3_helper, current_pr_number, temp_path
path_to_ccache_dir: str,
s3_helper: S3Helper,
current_pr_number: int,
temp_path: str,
release_pr: int,
) -> int:
"""returns: number of PR for downloaded PR. -1 if ccache not found"""
ccache_name = os.path.basename(path_to_ccache_dir)
cache_found = False
prs_to_check = [current_pr_number]
# Release PR is either 0 or defined
if release_pr:
prs_to_check.append(release_pr)
ccache_pr = -1
if current_pr_number != 0:
prs_to_check.append(0)

View File

@ -125,7 +125,7 @@ if __name__ == "__main__":
logging.info("Will try to fetch cache for our build")
ccache_for_pr = get_ccache_if_not_exists(
cache_path, s3_helper, pr_info.number, temp_path
cache_path, s3_helper, pr_info.number, temp_path, pr_info.release_pr
)
upload_master_ccache = ccache_for_pr in (-1, 0)

View File

@ -86,7 +86,7 @@ class PRInfo:
self.changed_files = set() # type: Set[str]
self.body = ""
self.diff_urls = []
self.release_pr = ""
self.release_pr = 0
ref = github_event.get("ref", "refs/head/master")
if ref and ref.startswith("refs/heads/"):
ref = ref[11:]

View File

@ -38,6 +38,20 @@
<path>/jbod1/</path>
<max_size>1000000000</max_size>
</s3_with_cache_and_jbod>
<s3_r>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3_r>
<s3_cache_r>
<type>cache</type>
<disk>s3_r</disk>
<path>/s3_cache_r/</path>
<max_size>1000000000</max_size>
<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>
</s3_cache_r>
</disks>
<policies>
<s3>
@ -78,6 +92,13 @@
</main>
</volumes>
</s3_with_cache_and_jbod>
<s3_cache_r>
<volumes>
<main>
<disk>s3_cache_r</disk>
</main>
</volumes>
</s3_cache_r>
</policies>
</storage_configuration>

View File

@ -6,7 +6,6 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -36,6 +35,7 @@ def cluster():
"/jbod1:size=2M",
],
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -742,3 +742,79 @@ def test_store_cleanup_disk_s3(cluster, node_name):
"CREATE TABLE s3_test UUID '00000000-1000-4000-8000-000000000001' (n UInt64) Engine=MergeTree() ORDER BY n SETTINGS storage_policy='s3';"
)
node.query("INSERT INTO s3_test SELECT 1")
@pytest.mark.parametrize("node_name", ["node"])
def test_cache_setting_compatibility(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_r';"
)
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500"
)
result = node.query("SYSTEM DROP FILESYSTEM CACHE")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) == 0
node.query("SELECT * FROM s3_test")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
config_path = os.path.join(
SCRIPT_DIR,
f"./{cluster.instances_dir_name}/node/configs/config.d/storage_conf.xml",
)
replace_config(
config_path,
"<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>",
"<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>",
)
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("1")
node.restart_clickhouse()
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("0")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")
replace_config(
config_path,
"<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>",
"<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>",
)
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
node.restart_clickhouse()
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("1")
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")

View File

@ -0,0 +1,2 @@
2020-01-01 01:00:00 1
2020-01-01 01:00:00 999

View File

@ -0,0 +1,7 @@
SELECT
toStartOfHour(c1) AS _c1,
c2
FROM values((toDateTime('2020-01-01 01:01:01'), 999), (toDateTime('2020-01-01 01:01:59'), 1))
ORDER BY
_c1 ASC,
c2 ASC

View File

@ -17,7 +17,7 @@ INSERT INTO test_table(timestamp, value) SELECT toDateTime('2020-01-01 12:00:00'
INSERT INTO test_table(timestamp, value) SELECT toDateTime('2020-01-02 12:00:00'), 1 FROM numbers(10);
INSERT INTO test_table(timestamp, value) SELECT toDateTime('2020-01-03 12:00:00'), 1 FROM numbers(10);
set optimize_respect_aliases = 1;
set optimize_respect_aliases = 1, optimize_monotonous_functions_in_order_by = 1;
SELECT 'test-partition-prune';
SELECT COUNT() = 10 FROM test_table WHERE day = '2020-01-01' SETTINGS max_rows_to_read = 10;

View File

@ -56,7 +56,13 @@ ENGINE = MergeTree ORDER BY (toStartOfDay(dt), d);
INSERT INTO t_read_in_order SELECT toDateTime('2020-10-10 00:00:00') + number, 1 / (number % 100 + 1), number FROM numbers(1000);
EXPLAIN PIPELINE SELECT toStartOfDay(dt) as date, d FROM t_read_in_order ORDER BY date, round(d) LIMIT 5;
SELECT toStartOfDay(dt) as date, d FROM t_read_in_order ORDER BY date, round(d) LIMIT 5;
SELECT * from (
SELECT toStartOfDay(dt) as date, d FROM t_read_in_order ORDER BY date, round(d) LIMIT 50000000000
-- subquery with limit 50000000 to stabilize a test result and prevent order by d pushdown
) order by d limit 5;
EXPLAIN PIPELINE SELECT toStartOfDay(dt) as date, d FROM t_read_in_order ORDER BY date, round(d) LIMIT 5;
SELECT toStartOfDay(dt) as date, d FROM t_read_in_order WHERE date = '2020-10-10' ORDER BY round(d) LIMIT 5;
SELECT * from (
SELECT toStartOfDay(dt) as date, d FROM t_read_in_order WHERE date = '2020-10-10' ORDER BY round(d) LIMIT 50000000000
-- subquery with limit 50000000 to stabilize a test result and prevent order by d pushdown
) order by d limit 5;

View File

@ -0,0 +1,4 @@
-- Tags: no-parallel, no-fasttest
insert into function file(02422_data.msgpack) select toUUID('f4cdd80d-5d15-4bdc-9527-adcca635ec1f') as uuid settings output_format_msgpack_uuid_representation='ext';
select * from file(02422_data.msgpack, auto, 'x Int32'); -- {serverError ILLEGAL_COLUMN}