Merge branch 'master' into async-read-from-socket

This commit is contained in:
Nikolai Kochetov 2020-12-15 21:04:47 +03:00
commit 4905201985
75 changed files with 623 additions and 261 deletions

View File

@ -81,7 +81,7 @@
* Install script should always create subdirs in config folders. This is only relevant for Docker build with custom config. [#16936](https://github.com/ClickHouse/ClickHouse/pull/16936) ([filimonov](https://github.com/filimonov)).
* Correct grammar in error message in JSONEachRow, JSONCompactEachRow, and RegexpRow input formats. [#17205](https://github.com/ClickHouse/ClickHouse/pull/17205) ([nico piderman](https://github.com/sneako)).
* Set default `host` and `port` parameters for `SOURCE(CLICKHOUSE(...))` to current instance and set default `user` value to `'default'`. [#16997](https://github.com/ClickHouse/ClickHouse/pull/16997) ([vdimir](https://github.com/vdimir)).
* Throw an informative error message when doing ATTACH/DETACH TABLE <DICTIONARY>. Before this PR, `detach table <dict>` works but leads to an ill-formed in-memory metadata. [#16885](https://github.com/ClickHouse/ClickHouse/pull/16885) ([Amos Bird](https://github.com/amosbird)).
* Throw an informative error message when doing `ATTACH/DETACH TABLE <DICTIONARY>`. Before this PR, `detach table <dict>` works but leads to an ill-formed in-memory metadata. [#16885](https://github.com/ClickHouse/ClickHouse/pull/16885) ([Amos Bird](https://github.com/amosbird)).
* Add cutToFirstSignificantSubdomainWithWWW(). [#16845](https://github.com/ClickHouse/ClickHouse/pull/16845) ([Azat Khuzhin](https://github.com/azat)).
* Server refused to startup with exception message if wrong config is given (`metric_log`.`collect_interval_milliseconds` is missing). [#16815](https://github.com/ClickHouse/ClickHouse/pull/16815) ([Ivan](https://github.com/abyss7)).
* Better exception message when configuration for distributed DDL is absent. This fixes [#5075](https://github.com/ClickHouse/ClickHouse/issues/5075). [#16769](https://github.com/ClickHouse/ClickHouse/pull/16769) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).

View File

@ -41,9 +41,10 @@ if (SANITIZE)
if (COMPILER_CLANG)
set (TSAN_FLAGS "${TSAN_FLAGS} -fsanitize-blacklist=${CMAKE_SOURCE_DIR}/tests/tsan_suppressions.txt")
else()
message (WARNING "TSAN suppressions was not passed to the compiler (since the compiler is not clang)")
message (WARNING "Use the following command to pass them manually:")
message (WARNING " export TSAN_OPTIONS=\"$TSAN_OPTIONS suppressions=${CMAKE_SOURCE_DIR}/tests/tsan_suppressions.txt\"")
set (MESSAGE "TSAN suppressions was not passed to the compiler (since the compiler is not clang)\n")
set (MESSAGE "${MESSAGE}Use the following command to pass them manually:\n")
set (MESSAGE "${MESSAGE} export TSAN_OPTIONS=\"$TSAN_OPTIONS suppressions=${CMAKE_SOURCE_DIR}/tests/tsan_suppressions.txt\"")
message (WARNING "${MESSAGE}")
endif()
@ -57,8 +58,18 @@ if (SANITIZE)
endif ()
elseif (SANITIZE STREQUAL "undefined")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=undefined -fno-sanitize-recover=all -fno-sanitize=float-divide-by-zero -fsanitize-blacklist=${CMAKE_SOURCE_DIR}/tests/ubsan_suppressions.txt")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=undefined -fno-sanitize-recover=all -fno-sanitize=float-divide-by-zero -fsanitize-blacklist=${CMAKE_SOURCE_DIR}/tests/ubsan_suppressions.txt")
set (UBSAN_FLAGS "-fsanitize=undefined -fno-sanitize-recover=all -fno-sanitize=float-divide-by-zero")
if (COMPILER_CLANG)
set (UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize-blacklist=${CMAKE_SOURCE_DIR}/tests/ubsan_suppressions.txt")
else()
set (MESSAGE "UBSAN suppressions was not passed to the compiler (since the compiler is not clang)\n")
set (MESSAGE "${MESSAGE}Use the following command to pass them manually:\n")
set (MESSAGE "${MESSAGE} export UBSAN_OPTIONS=\"$UBSAN_OPTIONS suppressions=${CMAKE_SOURCE_DIR}/tests/ubsan_suppressions.txt\"")
message (WARNING "${MESSAGE}")
endif()
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} ${UBSAN_FLAGS}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} ${UBSAN_FLAGS}")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined")
endif()

2
contrib/libunwind vendored

@ -1 +1 @@
Subproject commit 51b84d9b6d2548f1cbdcafe622d5a753853b6149
Subproject commit 8fe25d7dc70f2a4ea38c3e5a33fa9d4199b67a5a

View File

@ -148,6 +148,10 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ
if split_binary:
cmake_flags.append('-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1')
# We can't always build utils because it requires too much space, but
# we have to build them at least in some way in CI. The split build is
# probably the least heavy disk-wise.
cmake_flags.append('-DENABLE_UTILS=1')
if clang_tidy:
cmake_flags.append('-DENABLE_CLANG_TIDY=1')

View File

@ -15,6 +15,8 @@ For more information and documentation see https://clickhouse.yandex/.
$ docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server
```
By default ClickHouse will be accessible only via docker network. See the [networking section below](#networking).
### connect to it from a native client
```bash
$ docker run -it --rm --link some-clickhouse-server:clickhouse-server yandex/clickhouse-client --host clickhouse-server
@ -22,6 +24,70 @@ $ docker run -it --rm --link some-clickhouse-server:clickhouse-server yandex/cli
More information about [ClickHouse client](https://clickhouse.yandex/docs/en/interfaces/cli/).
### connect to it using curl
```bash
echo "SELECT 'Hello, ClickHouse!'" | docker run -i --rm --link some-clickhouse-server:clickhouse-server curlimages/curl 'http://clickhouse-server:8123/?query=' -s --data-binary @-
```
More information about [ClickHouse HTTP Interface](https://clickhouse.tech/docs/en/interfaces/http/).
### stopping / removing the containter
```bash
$ docker stop some-clickhouse-server
$ docker rm some-clickhouse-server
```
### networking
You can expose you ClickHouse running in docker by [mapping particular port](https://docs.docker.com/config/containers/container-networking/) from inside container to a host ports:
```bash
$ docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server
$ echo 'SELECT version()' | curl 'http://localhost:18123/' --data-binary @-
20.12.3.3
```
or by allowing container to use [host ports directly](https://docs.docker.com/network/host/) using `--network=host` (also allows archiving better network performance):
```bash
$ docker run -d --network=host --name some-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server
$ echo 'SELECT version()' | curl 'http://localhost:8123/' --data-binary @-
20.12.3.3
```
### Volumes
Typically you may want to mount the following folders inside your container to archieve persistency:
* `/var/lib/clickhouse/` - main folder where ClickHouse stores the data
* `/val/log/clickhouse-server/` - logs
```bash
$ docker run -d \
-v $(realpath ./ch_data):/var/lib/clickhouse/ \
-v $(realpath ./ch_logs):/var/log/clickhouse-server/ \
--name some-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server
```
You may also want to mount:
* `/etc/clickhouse-server/config.d/*.xml` - files with server configuration adjustmenets
* `/etc/clickhouse-server/usert.d/*.xml` - files with use settings adjustmenets
* `/docker-entrypoint-initdb.d/` - folder with database initialization scripts (see below).
### Linux capabilities
ClickHouse has some advanced functionality which requite enabling several [linux capabilities](https://man7.org/linux/man-pages/man7/capabilities.7.html).
It is optional and can be enabled using the following [docker command line agruments](https://docs.docker.com/engine/reference/run/#runtime-privilege-and-linux-capabilities):
```bash
$ docker run -d \
--cap-add=SYS_NICE --cap-add=NET_ADMIN --cap-add=IPC_LOCK \
--name some-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server
```
## Configuration
Container exposes 8123 port for [HTTP interface](https://clickhouse.yandex/docs/en/interfaces/http_interface/) and 9000 port for [native client](https://clickhouse.yandex/docs/en/interfaces/tcp/).

View File

@ -3,6 +3,7 @@
<mysql_port remove="remove"/>
<interserver_http_port remove="remove"/>
<tcp_with_proxy_port remove="remove"/>
<test_keeper_server remove="remove"/>
<listen_host>::</listen_host>
<logger>

View File

@ -33,7 +33,7 @@ struct AvgFraction
/// Allow division by zero as sometimes we need to return NaN.
/// Invoked only is either Numerator or Denominator are Decimal.
Float64 NO_SANITIZE_UNDEFINED divideIfAnyDecimal(UInt32 num_scale, UInt32 denom_scale) const
Float64 NO_SANITIZE_UNDEFINED divideIfAnyDecimal(UInt32 num_scale, UInt32 denom_scale [[maybe_unused]]) const
{
if constexpr (IsDecimalNumber<Numerator> && IsDecimalNumber<Denominator>)
{

View File

@ -65,6 +65,7 @@ class IColumn;
M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
@ -400,6 +401,7 @@ class IColumn;
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \
M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \
M(Bool, optimize_skip_merged_partitions, false, "Skip partitions with one part with level > 0 in optimize final", 0) \
M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \
\
M(Bool, use_antlr_parser, false, "Parse incoming queries using ANTLR-generated parser", 0) \
\

View File

@ -30,7 +30,6 @@ struct SortCursorImpl
ColumnRawPtrs all_columns;
SortDescription desc;
size_t sort_columns_size = 0;
size_t pos = 0;
size_t rows = 0;
/** Determines order if comparing columns are equal.
@ -49,15 +48,20 @@ struct SortCursorImpl
/** Is there at least one column with Collator. */
bool has_collation = false;
/** We could use SortCursorImpl in case when columns aren't sorted
* but we have their sorted permutation
*/
IColumn::Permutation * permutation = nullptr;
SortCursorImpl() {}
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0)
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
reset(block);
reset(block, perm);
}
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0)
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
for (auto & column_desc : desc)
@ -66,19 +70,19 @@ struct SortCursorImpl
throw Exception("SortDescription should contain column position if SortCursor was used without header.",
ErrorCodes::LOGICAL_ERROR);
}
reset(columns, {});
reset(columns, {}, perm);
}
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
void reset(const Block & block)
void reset(const Block & block, IColumn::Permutation * perm = nullptr)
{
reset(block.getColumns(), block);
reset(block.getColumns(), block, perm);
}
/// Set the cursor to the beginning of the new block.
void reset(const Columns & columns, const Block & block)
void reset(const Columns & columns, const Block & block, IColumn::Permutation * perm = nullptr)
{
all_columns.clear();
sort_columns.clear();
@ -96,18 +100,33 @@ struct SortCursorImpl
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());
need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); /// TODO Nullable(String)
need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported();
has_collation |= need_collation[j];
}
pos = 0;
rows = all_columns[0]->size();
permutation = perm;
}
size_t getRow() const
{
if (permutation)
return (*permutation)[pos];
return pos;
}
/// We need a possibility to change pos (see MergeJoin).
size_t & getPosRef() { return pos; }
bool isFirst() const { return pos == 0; }
bool isLast() const { return pos + 1 >= rows; }
bool isValid() const { return pos < rows; }
void next() { ++pos; }
/// Prevent using pos instead of getRow()
private:
size_t pos;
};
using SortCursorImpls = std::vector<SortCursorImpl>;
@ -127,7 +146,7 @@ struct SortCursorHelper
bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const
{
return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos);
return derived().greaterAt(rhs.derived(), impl->getRow(), rhs.impl->getRow());
}
/// Inverted so that the priority queue elements are removed in ascending order.

View File

@ -222,7 +222,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
// std::cerr << "Inserting row\n";
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
if (out_row_sources_buf)
{

View File

@ -348,11 +348,11 @@ public:
DiskS3::Metadata metadata_,
const String & s3_path_,
std::optional<DiskS3::ObjectMetadata> object_metadata_,
bool is_multipart,
size_t min_upload_part_size,
size_t max_single_part_upload_size,
size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, is_multipart,std::move(object_metadata_), buf_size_))
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, max_single_part_upload_size,std::move(object_metadata_), buf_size_))
, metadata(std::move(metadata_))
, s3_path(s3_path_)
{
@ -542,7 +542,7 @@ DiskS3::DiskS3(
String s3_root_path_,
String metadata_path_,
size_t min_upload_part_size_,
size_t min_multi_part_upload_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_)
: IDisk(std::make_unique<AsyncExecutor>())
@ -553,7 +553,7 @@ DiskS3::DiskS3(
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
, min_upload_part_size(min_upload_part_size_)
, min_multi_part_upload_size(min_multi_part_upload_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, send_metadata(send_metadata_)
{
@ -665,7 +665,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t)
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
{
bool exist = exists(path);
if (exist && readMeta(path).read_only)
@ -674,7 +674,6 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
/// Path to store new S3 object.
auto s3_path = getRandomName();
auto object_metadata = createObjectMetadata(path);
bool is_multipart = estimated_size >= min_multi_part_upload_size;
if (!exist || mode == WriteMode::Rewrite)
{
/// If metadata file exists - remove and create new.
@ -687,7 +686,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {}. New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, object_metadata, is_multipart, min_upload_part_size, buf_size);
return std::make_unique<WriteIndirectBufferFromS3>(
client, bucket, metadata, s3_path, object_metadata, min_upload_part_size, max_single_part_upload_size, buf_size);
}
else
{
@ -696,7 +696,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size());
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, object_metadata, is_multipart, min_upload_part_size, buf_size);
return std::make_unique<WriteIndirectBufferFromS3>(
client, bucket, metadata, s3_path, object_metadata, min_upload_part_size, max_single_part_upload_size, buf_size);
}
}

View File

@ -34,7 +34,7 @@ public:
String s3_root_path_,
String metadata_path_,
size_t min_upload_part_size_,
size_t min_multi_part_upload_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_);
@ -133,7 +133,7 @@ private:
const String s3_root_path;
const String metadata_path;
size_t min_upload_part_size;
size_t min_multi_part_upload_size;
size_t max_single_part_upload_size;
size_t min_bytes_for_seek;
bool send_metadata;

View File

@ -148,7 +148,7 @@ void registerDiskS3(DiskFactory & factory)
uri.key,
metadata_path,
context.getSettingsRef().s3_min_upload_part_size,
config.getUInt64(config_prefix + ".min_multi_part_upload_size", 10 * 1024 * 1024),
context.getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_object_metadata", false));

View File

@ -368,8 +368,12 @@ namespace S3
throw Exception(
"Bucket name length is out of bounds in virtual hosted style S3 URI: " + bucket + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
/// Remove leading '/' from path to extract key.
key = uri.getPath().substr(1);
if (!uri.getPath().empty())
{
/// Remove leading '/' from path to extract key.
key = uri.getPath().substr(1);
}
if (key.empty() || key == "/")
throw Exception("Key name is empty in virtual hosted style S3 URI: " + key + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
boost::to_upper(name);

View File

@ -5,12 +5,9 @@
# include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h>
# include <aws/core/utils/memory/stl/AWSStreamFwd.h>
# include <aws/core/utils/memory/stl/AWSStringStream.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/CompleteMultipartUploadRequest.h>
# include <aws/s3/model/AbortMultipartUploadRequest.h>
# include <aws/s3/model/CreateMultipartUploadRequest.h>
# include <aws/s3/model/CompleteMultipartUploadRequest.h>
# include <aws/s3/model/PutObjectRequest.h>
# include <aws/s3/model/UploadPartRequest.h>
# include <common/logger_useful.h>
@ -42,22 +39,19 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
bool is_multipart_,
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, is_multipart(is_multipart_)
, bucket(bucket_)
, key(key_)
, object_metadata(std::move(object_metadata_))
, client_ptr(std::move(client_ptr_))
, minimum_upload_part_size{minimum_upload_part_size_}
, temporary_buffer{std::make_unique<WriteBufferFromOwnString>()}
, last_part_size{0}
{
if (is_multipart)
initiate();
}
, minimum_upload_part_size(minimum_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, temporary_buffer(Aws::MakeShared<Aws::StringStream>("temporary buffer"))
, last_part_size(0)
{ }
void WriteBufferFromS3::nextImpl()
{
@ -68,16 +62,17 @@ void WriteBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset());
if (is_multipart)
{
last_part_size += offset();
last_part_size += offset();
if (last_part_size > minimum_upload_part_size)
{
writePart(temporary_buffer->str());
last_part_size = 0;
temporary_buffer->restart();
}
/// Data size exceeds singlepart upload threshold, need to use multipart upload.
if (multipart_upload_id.empty() && last_part_size > max_single_part_upload_size)
createMultipartUpload();
if (!multipart_upload_id.empty() && last_part_size > minimum_upload_part_size)
{
writePart();
last_part_size = 0;
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
}
}
@ -88,17 +83,23 @@ void WriteBufferFromS3::finalize()
void WriteBufferFromS3::finalizeImpl()
{
if (!finalized)
if (finalized)
return;
next();
if (multipart_upload_id.empty())
{
next();
if (is_multipart)
writePart(temporary_buffer->str());
complete();
finalized = true;
makeSinglepartUpload();
}
else
{
/// Write rest of the data as last part.
writePart();
completeMultipartUpload();
}
finalized = true;
}
WriteBufferFromS3::~WriteBufferFromS3()
@ -113,7 +114,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
}
}
void WriteBufferFromS3::initiate()
void WriteBufferFromS3::createMultipartUpload()
{
Aws::S3::Model::CreateMultipartUploadRequest req;
req.SetBucket(bucket);
@ -125,17 +126,17 @@ void WriteBufferFromS3::initiate()
if (outcome.IsSuccess())
{
upload_id = outcome.GetResult().GetUploadId();
LOG_DEBUG(log, "Multipart upload initiated. Upload id: {}", upload_id);
multipart_upload_id = outcome.GetResult().GetUploadId();
LOG_DEBUG(log, "Multipart upload has created. Upload id: {}", multipart_upload_id);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
void WriteBufferFromS3::writePart(const String & data)
void WriteBufferFromS3::writePart()
{
if (data.empty())
if (temporary_buffer->tellp() <= 0)
return;
if (part_tags.size() == S3_WARN_MAX_PARTS)
@ -149,93 +150,71 @@ void WriteBufferFromS3::writePart(const String & data)
req.SetBucket(bucket);
req.SetKey(key);
req.SetPartNumber(part_tags.size() + 1);
req.SetUploadId(upload_id);
req.SetContentLength(data.size());
req.SetBody(std::make_shared<Aws::StringStream>(data));
req.SetUploadId(multipart_upload_id);
req.SetContentLength(temporary_buffer->tellp());
req.SetBody(temporary_buffer);
auto outcome = client_ptr->UploadPart(req);
LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Data size: {}", bucket, key, upload_id, data.size());
LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Data size: {}", bucket, key, multipart_upload_id, temporary_buffer->tellp());
if (outcome.IsSuccess())
{
auto etag = outcome.GetResult().GetETag();
part_tags.push_back(etag);
LOG_DEBUG(log, "Writing part finished. Total parts: {}, Upload_id: {}, Etag: {}", part_tags.size(), upload_id, etag);
LOG_DEBUG(log, "Writing part finished. Total parts: {}, Upload_id: {}, Etag: {}", part_tags.size(), multipart_upload_id, etag);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
void WriteBufferFromS3::complete()
void WriteBufferFromS3::completeMultipartUpload()
{
if (is_multipart)
LOG_DEBUG(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, multipart_upload_id);
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(multipart_upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
if (part_tags.empty())
{
LOG_DEBUG(log, "Multipart upload has no data. Aborting it. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, upload_id);
Aws::S3::Model::AbortMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(upload_id);
auto outcome = client_ptr->AbortMultipartUpload(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Aborting multipart upload completed. Upload_id: {}", upload_id);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
return;
}
LOG_DEBUG(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, upload_id);
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
}
req.SetMultipartUpload(multipart_upload);
auto outcome = client_ptr->CompleteMultipartUpload(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Multipart upload completed. Upload_id: {}", upload_id);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
}
req.SetMultipartUpload(multipart_upload);
auto outcome = client_ptr->CompleteMultipartUpload(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Multipart upload has completed. Upload_id: {}", multipart_upload_id);
else
{
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}", bucket, key);
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (object_metadata.has_value())
req.SetMetadata(object_metadata.value());
void WriteBufferFromS3::makeSinglepartUpload()
{
if (temporary_buffer->tellp() <= 0)
return;
/// This could be improved using an adapter to WriteBuffer.
const std::shared_ptr<Aws::IOStream> input_data = Aws::MakeShared<Aws::StringStream>("temporary buffer", temporary_buffer->str());
temporary_buffer = std::make_unique<WriteBufferFromOwnString>();
req.SetBody(input_data);
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}", bucket, key);
auto outcome = client_ptr->PutObject(req);
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetContentLength(temporary_buffer->tellp());
req.SetBody(temporary_buffer);
if (object_metadata.has_value())
req.SetMetadata(object_metadata.value());
if (outcome.IsSuccess())
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}", bucket, key);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
auto outcome = client_ptr->PutObject(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}", bucket, key);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
}

View File

@ -6,11 +6,13 @@
# include <memory>
# include <vector>
# include <common/logger_useful.h>
# include <common/types.h>
# include <IO/BufferWithOwnMemory.h>
# include <IO/HTTPCommon.h>
# include <IO/WriteBuffer.h>
# include <IO/WriteBufferFromString.h>
# include <aws/core/utils/memory/stl/AWSStringStream.h>
namespace Aws::S3
{
@ -19,24 +21,30 @@ class S3Client;
namespace DB
{
/* Perform S3 HTTP PUT request.
/**
* Buffer to write a data to a S3 object with specified bucket and key.
* If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
* In another case multipart upload is used:
* Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold.
* Each chunk is written as a part to S3.
*/
class WriteBufferFromS3 : public BufferWithOwnMemory<WriteBuffer>
{
private:
bool is_multipart;
String bucket;
String key;
std::optional<std::map<String, String>> object_metadata;
std::shared_ptr<Aws::S3::S3Client> client_ptr;
size_t minimum_upload_part_size;
std::unique_ptr<WriteBufferFromOwnString> temporary_buffer;
size_t max_single_part_upload_size;
/// Buffer to accumulate data.
std::shared_ptr<Aws::StringStream> temporary_buffer;
size_t last_part_size;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finish upload with listing all our parts.
String upload_id;
String multipart_upload_id;
std::vector<String> part_tags;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
@ -47,7 +55,7 @@ public:
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
bool is_multipart,
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
@ -61,9 +69,11 @@ public:
private:
bool finalized = false;
void initiate();
void writePart(const String & data);
void complete();
void createMultipartUpload();
void writePart();
void completeMultipartUpload();
void makeSinglepartUpload();
void finalizeImpl();
};

View File

@ -249,6 +249,19 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf)
}
// credit: https://johnnylee-sde.github.io/Fast-numeric-string-to-int/
static inline bool is_made_of_eight_digits_fast(uint64_t val) noexcept
{
return (((val & 0xF0F0F0F0F0F0F0F0) | (((val + 0x0606060606060606) & 0xF0F0F0F0F0F0F0F0) >> 4)) == 0x3333333333333333);
}
static inline bool is_made_of_eight_digits_fast(const char * chars) noexcept
{
uint64_t val;
::memcpy(&val, chars, 8);
return is_made_of_eight_digits_fast(val);
}
template <size_t N, typename T>
static inline void readUIntTextUpToNSignificantDigits(T & x, ReadBuffer & buf)
{
@ -266,9 +279,6 @@ static inline void readUIntTextUpToNSignificantDigits(T & x, ReadBuffer & buf)
else
return;
}
while (!buf.eof() && isNumericASCII(*buf.position()))
++buf.position();
}
else
{
@ -283,10 +293,16 @@ static inline void readUIntTextUpToNSignificantDigits(T & x, ReadBuffer & buf)
else
return;
}
while (!buf.eof() && isNumericASCII(*buf.position()))
++buf.position();
}
while (!buf.eof() && (buf.position() + 8 <= buf.buffer().end()) &&
is_made_of_eight_digits_fast(buf.position()))
{
buf.position() += 8;
}
while (!buf.eof() && isNumericASCII(*buf.position()))
++buf.position();
}
@ -319,7 +335,6 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
++in.position();
}
auto count_after_sign = in.count();
constexpr int significant_digits = std::numeric_limits<UInt64>::digits10;

View File

@ -1552,7 +1552,12 @@ void InterpreterSelectQuery::executeFetchColumns(
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
/// Specify the number of threads only if it wasn't specified in storage.
if (!query_plan.getMaxThreads())
///
/// But in case of remote query and prefer_localhost_replica=1 (default)
/// The inner local query (that is done in the same process, without
/// network interaction), it will setMaxThreads earlier and distributed
/// query will not update it.
if (!query_plan.getMaxThreads() || is_remote)
query_plan.setMaxThreads(max_threads_execute_query);
/// Aliases in table declaration.

View File

@ -128,24 +128,24 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_, const Context & context_, const SelectQueryOptions & options_, const Names & required_result_column_names)
: IInterpreterUnionOrSelectQuery(query_ptr_, context_, options_)
{
auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
ASTSelectWithUnionQuery * ast = query_ptr->as<ASTSelectWithUnionQuery>();
/// Normalize AST Tree
if (!ast.is_normalized)
if (!ast->is_normalized)
{
CustomizeASTSelectWithUnionQueryNormalizeVisitor::Data union_default_mode{context->getSettingsRef().union_default_mode};
CustomizeASTSelectWithUnionQueryNormalizeVisitor(union_default_mode).visit(query_ptr);
/// After normalization, if it only has one ASTSelectWithUnionQuery child,
/// we can lift it up, this can reduce one unnecessary recursion later.
if (ast.list_of_selects->children.size() == 1 && ast.list_of_selects->children.at(0)->as<ASTSelectWithUnionQuery>())
if (ast->list_of_selects->children.size() == 1 && ast->list_of_selects->children.at(0)->as<ASTSelectWithUnionQuery>())
{
query_ptr = std::move(ast.list_of_selects->children.at(0));
ast = query_ptr->as<ASTSelectWithUnionQuery &>();
query_ptr = std::move(ast->list_of_selects->children.at(0));
ast = query_ptr->as<ASTSelectWithUnionQuery>();
}
}
size_t num_children = ast.list_of_selects->children.size();
size_t num_children = ast->list_of_selects->children.size();
if (!num_children)
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
@ -161,7 +161,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
/// Result header if there are no filtering by 'required_result_column_names'.
/// We use it to determine positions of 'required_result_column_names' in SELECT clause.
Block full_result_header = getCurrentChildResultHeader(ast.list_of_selects->children.at(0), required_result_column_names);
Block full_result_header = getCurrentChildResultHeader(ast->list_of_selects->children.at(0), required_result_column_names);
std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size());
@ -171,7 +171,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
for (size_t query_num = 1; query_num < num_children; ++query_num)
{
Block full_result_header_for_current_select
= getCurrentChildResultHeader(ast.list_of_selects->children.at(query_num), required_result_column_names);
= getCurrentChildResultHeader(ast->list_of_selects->children.at(query_num), required_result_column_names);
if (full_result_header_for_current_select.columns() != full_result_header.columns())
throw Exception("Different number of columns in UNION ALL elements:\n"
@ -192,7 +192,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
= query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num];
nested_interpreters.emplace_back(
buildCurrentChildInterpreter(ast.list_of_selects->children.at(query_num), current_required_result_column_names));
buildCurrentChildInterpreter(ast->list_of_selects->children.at(query_num), current_required_result_column_names));
}
/// Determine structure of the result.

View File

@ -180,12 +180,16 @@ class MergeJoinCursor
public:
MergeJoinCursor(const Block & block, const SortDescription & desc_)
: impl(SortCursorImpl(block, desc_))
{}
{
/// SortCursorImpl can work with permutation, but MergeJoinCursor can't.
if (impl.permutation)
throw Exception("Logical error: MergeJoinCursor doesn't support permutation", ErrorCodes::LOGICAL_ERROR);
}
size_t position() const { return impl.pos; }
size_t position() const { return impl.getRow(); }
size_t end() const { return impl.rows; }
bool atEnd() const { return impl.pos >= impl.rows; }
void nextN(size_t num) { impl.pos += num; }
bool atEnd() const { return impl.getRow() >= impl.rows; }
void nextN(size_t num) { impl.getPosRef() += num; }
void setCompareNullability(const MergeJoinCursor & rhs)
{
@ -254,10 +258,10 @@ private:
else if (cmp > 0)
rhs.impl.next();
else if (!cmp)
return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()};
return Range{impl.getRow(), rhs.impl.getRow(), getEqualLength(), rhs.getEqualLength()};
}
return Range{impl.pos, rhs.impl.pos, 0, 0};
return Range{impl.getRow(), rhs.impl.getRow(), 0, 0};
}
template <bool left_nulls, bool right_nulls>
@ -268,7 +272,7 @@ private:
const auto * left_column = impl.sort_columns[i];
const auto * right_column = rhs.impl.sort_columns[i];
int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.pos, rhs.impl.pos);
int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.getRow(), rhs.impl.getRow());
if (res)
return res;
}
@ -278,11 +282,11 @@ private:
/// Expects !atEnd()
size_t getEqualLength()
{
size_t pos = impl.pos + 1;
size_t pos = impl.getRow() + 1;
for (; pos < impl.rows; ++pos)
if (!samePrev(pos))
break;
return pos - impl.pos;
return pos - impl.getRow();
}
/// Expects lhs_pos > 0

View File

@ -449,12 +449,22 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
if (!s_rparen.ignore(pos, expected))
return false;
if (!storage_p.parse(pos, storage, expected) && !is_temporary)
auto storage_parse_result = storage_p.parse(pos, storage, expected);
if (storage_parse_result && s_as.ignore(pos, expected))
{
if (!select_p.parse(pos, select, expected))
return false;
}
if (!storage_parse_result && !is_temporary)
{
if (!s_as.ignore(pos, expected))
return false;
if (!table_function_p.parse(pos, as_table_function, expected))
{
return false;
}
}
}
else

View File

@ -257,12 +257,12 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & curs
throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR);
for (auto & desc : def.columns_to_aggregate)
desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos);
desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getRow());
for (auto & desc : def.columns_to_simple_aggregate)
{
auto & col = cursor->all_columns[desc.column_number];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get());
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getRow(), arena.get());
}
}
@ -352,7 +352,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
return Status(merged_data.pull());
}
merged_data.startGroup(current->all_columns, current->pos);
merged_data.startGroup(current->all_columns, current->getRow());
}
merged_data.addRow(current);

View File

@ -26,9 +26,9 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
const String & sign_column,
bool only_positive_sign_,
size_t max_block_size,
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
Poco::Logger * log_)
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, sign_column_number(header.getPositionByName(sign_column))
@ -123,7 +123,7 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
return Status(current.impl->order);
}
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getRow()];
RowRef current_row;
setRowRef(current_row, current);

View File

@ -33,9 +33,9 @@ public:
const String & sign_column,
bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
Poco::Logger * log_);
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
Status merge() override;

View File

@ -164,12 +164,12 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
return Status(current.impl->order);
}
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos);
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getRow());
bool new_path = is_first || next_path != current_group_path;
is_first = false;
time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->pos);
time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getRow());
/// Is new key before rounding.
bool is_new_key = new_path || next_row_time != current_time;
@ -227,7 +227,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
/// and for rows with same maximum version - only last row.
if (is_new_key
|| current->all_columns[columns_definition.version_column_num]->compareAt(
current->pos, current_subgroup_newest_row.row_num,
current->getRow(), current_subgroup_newest_row.row_num,
*(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num],
/* nan_direction_hint = */ 1) >= 0)
{
@ -263,7 +263,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
void GraphiteRollupSortedAlgorithm::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule)
{
merged_data.startNextGroup(cursor->all_columns, cursor->pos, next_rule, columns_definition);
merged_data.startNextGroup(cursor->all_columns, cursor->getRow(), next_rule, columns_definition);
}
void GraphiteRollupSortedAlgorithm::finishCurrentGroup()

View File

@ -29,6 +29,8 @@ public:
/// between different algorithm objects in parallel FINAL.
bool skip_last_row = false;
IColumn::Permutation * permutation = nullptr;
void swap(Input & other)
{
chunk.swap(other.chunk);

View File

@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
if (!current_inputs[source_num].chunk)
continue;
cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num);
cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
}
queue = SortingHeap<SortCursor>(cursors);
@ -37,7 +37,7 @@ void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t sourc
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
current_input.swap(input);
cursors[source_num].reset(current_input.chunk.getColumns(), {});
cursors[source_num].reset(current_input.chunk.getColumns(), {}, current_input.permutation);
queue.push(cursors[source_num]);
}

View File

@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
source.skip_last_row = inputs[source_num].skip_last_row;
source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num);
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;
@ -55,7 +55,7 @@ void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num
auto & source = sources[source_num];
source.skip_last_row = input.skip_last_row;
source.chunk = chunk_allocator.alloc(input.chunk);
cursors[source_num].reset(source.chunk->getColumns(), {});
cursors[source_num].reset(source.chunk->getColumns(), {}, input.permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;

View File

@ -139,7 +139,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
//std::cerr << "Inserting row\n";
merged_data.insertRow(current->all_columns, current->pos, current->rows);
merged_data.insertRow(current->all_columns, current->getRow(), current->rows);
if (out_row_sources_buf)
{

View File

@ -18,9 +18,9 @@ public:
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes);
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
void addInput();

View File

@ -73,7 +73,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (version_column_number == -1
|| selected_row.empty()
|| current->all_columns[version_column_number]->compareAt(
current->pos, selected_row.row_num,
current->getRow(), selected_row.row_num,
*(*selected_row.all_columns)[version_column_number],
/* nan_direction_hint = */ 1) >= 0)
{

View File

@ -136,7 +136,7 @@ struct RowRef
{
sort_columns = cursor.impl->sort_columns.data();
num_columns = cursor.impl->sort_columns.size();
row_num = cursor.impl->pos;
row_num = cursor.impl->getRow();
}
static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row)
@ -192,7 +192,7 @@ struct RowRefWithOwnedChunk
void set(SortCursor & cursor, SharedChunkPtr chunk)
{
owned_chunk = std::move(chunk);
row_num = cursor.impl->pos;
row_num = cursor.impl->getRow();
all_columns = &owned_chunk->all_columns;
sort_columns = &owned_chunk->sort_columns;
}

View File

@ -688,10 +688,10 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
return Status(merged_data.pull());
}
merged_data.startGroup(current->all_columns, current->pos);
merged_data.startGroup(current->all_columns, current->getRow());
}
else
merged_data.addRow(current->all_columns, current->pos);
merged_data.addRow(current->all_columns, current->getRow());
if (!current->isLast())
{

View File

@ -73,7 +73,7 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge()
RowRef current_row;
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getRow()];
setRowRef(current_row, current);

View File

@ -27,9 +27,9 @@ public:
sign_column,
only_positive_sign,
max_block_size,
&Poco::Logger::get("CollapsingSortedTransform"),
out_row_sources_buf_,
use_average_block_sizes,
&Poco::Logger::get("CollapsingSortedTransform"))
use_average_block_sizes)
{
}

View File

@ -100,7 +100,7 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
/// Append a row from queue.
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
++total_merged_rows;
++merged_rows;

View File

@ -22,7 +22,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{
Stopwatch watch;
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert);
storage.renameTempPartAndAdd(part, &storage.increment);
PartLog::addNewPart(storage.global_context, part, watch.elapsed());

View File

@ -14,10 +14,11 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_)
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, bool optimize_on_insert_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, optimize_on_insert(optimize_on_insert_)
{
}
@ -28,6 +29,7 @@ private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
bool optimize_on_insert;
};
}

View File

@ -16,6 +16,14 @@
#include <Parsers/queryToString.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
#include <Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterBlocks;
@ -194,7 +202,74 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
return result;
}
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot)
Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation)
{
size_t block_size = block.rows();
auto get_merging_algorithm = [&]() -> std::shared_ptr<IMergingAlgorithm>
{
switch (data.merging_params.mode)
{
/// There is nothing to merge in single block in ordinary MergeTree
case MergeTreeData::MergingParams::Ordinary:
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, data.merging_params.version_column, block_size + 1);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, data.merging_params.sign_column,
false, block_size + 1, &Poco::Logger::get("MergeTreeBlockOutputStream"));
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedAlgorithm>(
block, 1, sort_description, data.merging_params.columns_to_sum,
partition_key_columns, block_size + 1);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingAlgorithm>(
block, 1, sort_description, data.merging_params.sign_column, block_size + 1);
case MergeTreeData::MergingParams::Graphite:
return std::make_shared<GraphiteRollupSortedAlgorithm>(
block, 1, sort_description, block_size + 1, data.merging_params.graphite_params, time(nullptr));
}
__builtin_unreachable();
};
auto merging_algorithm = get_merging_algorithm();
if (!merging_algorithm)
return block;
Chunk chunk(block.getColumns(), block_size);
IMergingAlgorithm::Input input;
input.set(std::move(chunk));
input.permutation = permutation;
IMergingAlgorithm::Inputs inputs;
inputs.push_back(std::move(input));
merging_algorithm->initialize(std::move(inputs));
IMergingAlgorithm::Status status = merging_algorithm->merge();
/// Check that after first merge merging_algorithm is waiting for data from input 0.
if (status.required_source != 0)
throw Exception("Logical error: required source after the first merge is not 0.", ErrorCodes::LOGICAL_ERROR);
status = merging_algorithm->merge();
/// Check that merge is finished.
if (!status.is_finished)
throw Exception("Logical error: merge is not finished after the second merge.", ErrorCodes::LOGICAL_ERROR);
/// Merged Block is sorted and we don't need to use permutation anymore
permutation = nullptr;
return block.cloneWithColumns(status.chunk.getColumns());
}
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert)
{
Block & block = block_with_partition.block;
@ -228,6 +303,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
else
part_name = new_part_info.getPartName();
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
if (optimize_on_insert)
block = mergeBlock(block, sort_description, partition_key_columns, perm_ptr);
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
@ -274,34 +381,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
sync_guard.emplace(disk, full_path);
}
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
if (metadata_snapshot->hasRowsTTL())
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);

View File

@ -45,7 +45,9 @@ public:
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot);
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert);
Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation);
private:
MergeTreeData & data;

View File

@ -40,7 +40,8 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool quorum_parallel_,
bool deduplicate_)
bool deduplicate_,
bool optimize_on_insert_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, quorum(quorum_)
@ -49,6 +50,7 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
, quorum_parallel(quorum_parallel_)
, deduplicate(deduplicate_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
, optimize_on_insert(optimize_on_insert_)
{
/// The quorum value `1` has the same meaning as if it is disabled.
if (quorum == 1)
@ -142,7 +144,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
/// Write part to the filesystem under temporary name. Calculate a checksum.
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert);
String block_id;

View File

@ -29,7 +29,8 @@ public:
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool quorum_parallel_,
bool deduplicate_);
bool deduplicate_,
bool optimize_on_insert);
Block getHeader() const override;
void writePrefix() override;
@ -71,6 +72,8 @@ private:
using Logger = Poco::Logger;
Poco::Logger * log;
bool optimize_on_insert;
};
}

View File

@ -233,7 +233,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto
const auto & settings = context.getSettingsRef();
return std::make_shared<MergeTreeBlockOutputStream>(
*this, metadata_snapshot, settings.max_partitions_per_insert_block);
*this, metadata_snapshot, settings.max_partitions_per_insert_block, context.getSettingsRef().optimize_on_insert);
}
void StorageMergeTree::checkTableCanBeDropped() const

View File

@ -3861,7 +3861,8 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
query_settings.insert_quorum_parallel,
deduplicate);
deduplicate,
context.getSettingsRef().optimize_on_insert);
}
@ -4444,7 +4445,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false); /// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false); /// TODO Allow to use quorum here.
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
String old_name = loaded_parts[i]->name;

View File

@ -141,17 +141,18 @@ namespace
public:
StorageS3BlockOutputStream(
const String & format,
UInt64 min_upload_part_size,
const Block & sample_block_,
const Context & context,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const String & bucket,
const String & key)
const String & key,
size_t min_upload_part_size,
size_t max_single_part_upload_size)
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, true), compression_method, 3);
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -192,6 +193,7 @@ StorageS3::StorageS3(
const StorageID & table_id_,
const String & format_name_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
@ -201,6 +203,7 @@ StorageS3::StorageS3(
, global_context(context_.getGlobalContext())
, format_name(format_name_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, compression_method(compression_method_)
, name(uri_.storage_name)
{
@ -331,9 +334,15 @@ Pipe StorageS3::read(
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<StorageS3BlockOutputStream>(
format_name, min_upload_part_size, metadata_snapshot->getSampleBlock(),
global_context, chooseCompressionMethod(uri.endpoint, compression_method),
client, uri.bucket, uri.key);
format_name,
metadata_snapshot->getSampleBlock(),
global_context,
chooseCompressionMethod(uri.endpoint, compression_method),
client,
uri.bucket,
uri.key,
min_upload_part_size,
max_single_part_upload_size);
}
void registerStorageS3Impl(const String & name, StorageFactory & factory)
@ -362,6 +371,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
}
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = args.local_context.getSettingsRef().s3_max_single_part_upload_size;
String compression_method;
String format_name;
@ -383,6 +393,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
args.table_id,
format_name,
min_upload_part_size,
max_single_part_upload_size,
args.columns,
args.constraints,
args.context,

View File

@ -31,6 +31,7 @@ public:
const StorageID & table_id_,
const String & format_name_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
@ -59,7 +60,8 @@ private:
const Context & global_context;
String format_name;
UInt64 min_upload_part_size;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
String compression_method;
std::shared_ptr<Aws::S3::S3Client> client;
String name;

View File

@ -67,6 +67,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
Poco::URI uri (filename);
S3::URI s3_uri (uri);
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size;
StoragePtr storage = StorageS3::create(
s3_uri,
@ -75,6 +76,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
StorageID(getDatabaseName(), table_name),
format,
min_upload_part_size,
max_single_part_upload_size,
getActualTableStructure(context),
ConstraintsDescription{},
const_cast<Context &>(context),

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<optimize_on_insert>0</optimize_on_insert>
</default>
</profiles>
</yandex>

View File

@ -8,7 +8,8 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/graphite_rollup.xml'])
main_configs=['configs/graphite_rollup.xml'],
user_configs=["configs/users.xml"])
q = instance.query

View File

@ -3,6 +3,8 @@
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<allow_introspection_functions>1</allow_introspection_functions>
<optimize_on_insert>0</optimize_on_insert>
<default_database_engine>Ordinary</default_database_engine>
</default>
</profiles>

View File

@ -1,5 +1,10 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<optimize_on_insert>0</optimize_on_insert>
</default>
</profiles>
<users>
<another>
<password/>
@ -10,4 +15,4 @@
<quota>default</quota>
</another>
</users>
</yandex>
</yandex>

View File

@ -306,7 +306,8 @@ def test_multipart_put(cluster, maybe_auth, positive):
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
try:
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes})
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes,
's3_max_single_part_upload_size': 0})
except helpers.client.QueryRuntimeException:
if positive:
raise

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS merge_tree;
DROP TABLE IF EXISTS collapsing_merge_tree;
DROP TABLE IF EXISTS versioned_collapsing_merge_tree;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS summing_composite_key;
CREATE TABLE summing_composite_key (d Date, k UInt64, FirstMap Nested(k1 UInt32, k2ID Int8, s Float64), SecondMap Nested(k1ID UInt64, k2Key String, k3Type Int32, s Int64)) ENGINE = SummingMergeTree(d, k, 1);

View File

@ -42,13 +42,13 @@ $CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
toUInt64(0) AS ki
FROM system.numbers LIMIT 9000"
FROM system.numbers LIMIT 9000" --server_logs_file=/dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
number AS ki
FROM system.numbers LIMIT 9000, 9000"
FROM system.numbers LIMIT 9000, 9000" --server_logs_file=/dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO $name SELECT
toDate(0) AS date,
@ -67,7 +67,7 @@ number AS di09,
number AS di10,
[number, number+1] AS \`n.i\`,
[hex(number), hex(number+1)] AS \`n.s\`
FROM system.numbers LIMIT $res_rows"
FROM system.numbers LIMIT $res_rows" --server_logs_file=/dev/null
while [[ $(get_num_parts) -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001" --server_logs_file=/dev/null; done

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
SELECT '*** Replicated with sampling ***';
DROP TABLE IF EXISTS replicated_with_sampling;

View File

@ -1,3 +1,5 @@
set optimize_on_insert = 0;
drop table if exists mult_tab;
create table mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version);
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;

View File

@ -1,3 +1,5 @@
set optimize_on_insert = 0;
drop table if exists tab_00577;
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS test_00616;
DROP TABLE IF EXISTS replacing_00616;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS partitioned_by_tuple;
CREATE TABLE partitioned_by_tuple (d Date, x UInt8, w String, y UInt8) ENGINE SummingMergeTree (y) PARTITION BY (d, x) ORDER BY (d, x, w);

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00661;
DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00661;
CREATE TABLE partitioned_by_tuple_replica1_00661(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple_00661', '1') PARTITION BY (d, x) ORDER BY (d, x, w);

View File

@ -1,4 +1,5 @@
SET send_logs_level = 'fatal';
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS old_style;
CREATE TABLE old_style(d Date, x UInt32) ENGINE MergeTree(d, x, 8192);

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
SET send_logs_level = 'fatal';
DROP TABLE IF EXISTS old_style;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
select '-- SummingMergeTree with Nullable column without duplicates.';
drop table if exists tst;

View File

@ -49,3 +49,7 @@ DROP DICTIONARY dict;
DROP TABLE test_01056_dict_data.dict_data;
DROP DATABASE test_01056_dict_data;
CREATE TABLE t1 (x String) ENGINE = Memory AS SELECT 1;
SELECT x, toTypeName(x) FROM t1;
DROP TABLE t1;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS data_01285;
SET max_threads=1;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS tags;
CREATE TABLE tags (

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS tt_01373;
CREATE TABLE tt_01373

View File

@ -0,0 +1,13 @@
Replacing Merge Tree
1 2020-01-01 00:00:00
2 2020-01-02 00:00:00
Collapsing Merge Tree
1 1 2020-01-01 00:00:00
Versioned Collapsing Merge Tree
1 1 2 2020-01-01 00:00:00
Summing Merge Tree
1 6 2020-01-01 00:00:00
2 6 2020-01-02 00:00:00
Aggregating Merge Tree
1 5 2020-01-01 00:00:00
2 5 2020-01-02 00:00:00

View File

@ -0,0 +1,35 @@
SELECT 'Replacing Merge Tree';
DROP TABLE IF EXISTS replacing_merge_tree;
CREATE TABLE replacing_merge_tree (key UInt32, date Datetime) ENGINE=ReplacingMergeTree() PARTITION BY date ORDER BY key;
INSERT INTO replacing_merge_tree VALUES (1, '2020-01-01'), (2, '2020-01-02'), (1, '2020-01-01'), (2, '2020-01-02');
SELECT * FROM replacing_merge_tree ORDER BY key;
DROP TABLE replacing_merge_tree;
SELECT 'Collapsing Merge Tree';
DROP TABLE IF EXISTS collapsing_merge_tree;
CREATE TABLE collapsing_merge_tree (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key;
INSERT INTO collapsing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, -1, '2020-01-01'), (2, -1, '2020-01-02'), (1, 1, '2020-01-01');
SELECT * FROM collapsing_merge_tree ORDER BY key;
DROP TABLE collapsing_merge_tree;
SELECT 'Versioned Collapsing Merge Tree';
DROP TABLE IF EXISTS versioned_collapsing_merge_tree;
CREATE TABLE versioned_collapsing_merge_tree (key UInt32, sign Int8, version Int32, date Datetime) ENGINE=VersionedCollapsingMergeTree(sign, version) PARTITION BY date ORDER BY (key, version);
INSERT INTO versioned_collapsing_merge_tree VALUES (1, 1, 1, '2020-01-01'), (1, -1, 1, '2020-01-01'), (1, 1, 2, '2020-01-01');
SELECT * FROM versioned_collapsing_merge_tree ORDER BY key;
DROP TABLE versioned_collapsing_merge_tree;
SELECT 'Summing Merge Tree';
DROP TABLE IF EXISTS summing_merge_tree;
CREATE TABLE summing_merge_tree (key UInt32, val UInt32, date Datetime) ENGINE=SummingMergeTree(val) PARTITION BY date ORDER BY key;
INSERT INTO summing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02');
SELECT * FROM summing_merge_tree ORDER BY key;
DROP TABLE summing_merge_tree;
SELECT 'Aggregating Merge Tree';
DROP TABLE IF EXISTS aggregating_merge_tree;
CREATE TABLE aggregating_merge_tree (key UInt32, val SimpleAggregateFunction(max, UInt32), date Datetime) ENGINE=AggregatingMergeTree() PARTITION BY date ORDER BY key;
INSERT INTO aggregating_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02');
SELECT * FROM aggregating_merge_tree ORDER BY key;
DROP TABLE aggregating_merge_tree;

View File

@ -1,17 +1,29 @@
-- Check remerge_sort_lowered_memory_bytes_ratio setting
set max_memory_usage='4Gi';
set max_memory_usage='300Mi';
-- enter remerge once limit*2 is reached
set max_bytes_before_remerge_sort='10Mi';
-- more blocks
set max_block_size=40960;
-- default 2, slightly not enough:
--
-- MergeSortingTransform: Memory usage is lowered from 1.91 GiB to 980.00 MiB
-- remerge_sort_lowered_memory_bytes_ratio default 2, slightly not enough
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 819200 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 186.25 MiB to 95.00 MiB
-- MergeSortingTransform: Re-merging is not useful (memory usage was not lowered by remerge_sort_lowered_memory_bytes_ratio=2.0)
select number k, repeat(toString(number), 101) v1, repeat(toString(number), 102) v2, repeat(toString(number), 103) v3 from numbers(toUInt64(10e6)) order by k limit 400e3 format Null; -- { serverError 241 }
select number k, repeat(toString(number), 101) v1, repeat(toString(number), 102) v2, repeat(toString(number), 103) v3 from numbers(toUInt64(10e6)) order by k limit 400e3 settings remerge_sort_lowered_memory_bytes_ratio=2. format Null; -- { serverError 241 }
select number k, repeat(toString(number), 11) v1, repeat(toString(number), 12) v2 from numbers(toUInt64(3e6)) order by k limit 400e3 format Null; -- { serverError 241 }
select number k, repeat(toString(number), 11) v1, repeat(toString(number), 12) v2 from numbers(toUInt64(3e6)) order by k limit 400e3 settings remerge_sort_lowered_memory_bytes_ratio=2. format Null; -- { serverError 241 }
-- 1.91/0.98=1.94 is good
select number k, repeat(toString(number), 101) v1, repeat(toString(number), 102) v2, repeat(toString(number), 103) v3 from numbers(toUInt64(10e6)) order by k limit 400e3 settings remerge_sort_lowered_memory_bytes_ratio=1.9 format Null;
-- remerge_sort_lowered_memory_bytes_ratio 1.9 is good (need at least 1.91/0.98=1.94)
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 819200 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 186.25 MiB to 95.00 MiB
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 809600 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 188.13 MiB to 95.00 MiB
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 809600 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 188.13 MiB to 95.00 MiB
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 809600 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 188.13 MiB to 95.00 MiB
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 809600 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 188.13 MiB to 95.00 MiB
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 809600 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 188.13 MiB to 95.00 MiB
select number k, repeat(toString(number), 11) v1, repeat(toString(number), 12) v2 from numbers(toUInt64(3e6)) order by k limit 400e3 settings remerge_sort_lowered_memory_bytes_ratio=1.9 format Null;

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
common_opts=(
"--format=Null"
"--max_threads=1"
"--max_distributed_connections=3"
)
# NOTE: the test use higher timeout to avoid flakiness.
timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=0
timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=1

View File

@ -1,4 +1,5 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>