Merge branch 'master' into Azure_write_buffer

This commit is contained in:
Smita Kulkarni 2024-03-21 16:22:28 +01:00
commit 71eee6a300
46 changed files with 418 additions and 133 deletions

View File

@ -67,7 +67,6 @@ HedgedConnections::HedgedConnections(
}
active_connection_count = connections.size();
offsets_with_disabled_changing_replica = 0;
pipeline_for_new_replicas.add([throttler_](ReplicaState & replica_) { replica_.connection->setThrottler(throttler_); });
}

View File

@ -178,12 +178,12 @@ private:
std::queue<int> offsets_queue;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count;
size_t active_connection_count = 0;
/// We count offsets in which we can't change replica anymore,
/// it's needed to cancel choosing new replicas when we
/// disabled replica changing in all offsets.
size_t offsets_with_disabled_changing_replica;
size_t offsets_with_disabled_changing_replica = 0;
Pipeline pipeline_for_new_replicas;

View File

@ -1,3 +1,7 @@
if (ENABLE_EXAMPLES)
add_subdirectory (examples)
endif ()
if (ENABLE_BENCHMARKS)
add_subdirectory(benchmarks)
endif()

View File

@ -150,6 +150,8 @@ public:
++s;
}
void insertManyFrom(const IColumn & /*src*/, size_t /* position */, size_t length) override { s += length; }
void insertDefault() override
{
++s;

View File

@ -56,6 +56,13 @@ public:
void shrinkToFit() override { data.shrink_to_fit(); }
void insertFrom(const IColumn & src, size_t n) override { data.push_back(static_cast<const Self &>(src).getData()[n]); }
void insertManyFrom(const IColumn & src, size_t position, size_t length) override
{
ValueType v = assert_cast<const Self &>(src).getData()[position];
data.resize_fill(data.size() + length, v);
}
void insertData(const char * src, size_t /*length*/) override;
void insertDefault() override { data.push_back(T()); }
void insertManyDefaults(size_t length) override { data.resize_fill(data.size() + length); }

View File

@ -85,6 +85,20 @@ void ColumnFixedString::insertFrom(const IColumn & src_, size_t index)
memcpySmallAllowReadWriteOverflow15(chars.data() + old_size, &src.chars[n * index], n);
}
void ColumnFixedString::insertManyFrom(const IColumn & src, size_t position, size_t length)
{
const ColumnFixedString & src_concrete = assert_cast<const ColumnFixedString &>(src);
if (n != src_concrete.getN())
throw Exception(ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH, "Size of FixedString doesn't match");
const size_t old_size = chars.size();
const size_t new_size = old_size + n * length;
chars.resize(new_size);
for (size_t offset = old_size; offset < new_size; offset += n)
memcpySmallAllowReadWriteOverflow15(&chars[offset], &src_concrete.chars[n * position], n);
}
void ColumnFixedString::insertData(const char * pos, size_t length)
{
if (length > n)

View File

@ -100,6 +100,8 @@ public:
void insertFrom(const IColumn & src_, size_t index) override;
void insertManyFrom(const IColumn & src, size_t position, size_t length) override;
void insertData(const char * pos, size_t length) override;
void insertDefault() override

View File

@ -158,6 +158,11 @@ void ColumnMap::insertFrom(const IColumn & src, size_t n)
nested->insertFrom(assert_cast<const ColumnMap &>(src).getNestedColumn(), n);
}
void ColumnMap::insertManyFrom(const IColumn & src, size_t position, size_t length)
{
assert_cast<ColumnArray &>(*nested).insertManyFrom(assert_cast<const ColumnMap &>(src).getNestedColumn(), position, length);
}
void ColumnMap::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
nested->insertRangeFrom(

View File

@ -67,6 +67,7 @@ public:
void updateWeakHash32(WeakHash32 & hash) const override;
void updateHashFast(SipHash & hash) const override;
void insertFrom(const IColumn & src_, size_t n) override;
void insertManyFrom(const IColumn & src, size_t position, size_t length) override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
void expand(const Filter & mask, bool inverted) override;

View File

@ -231,6 +231,14 @@ void ColumnNullable::insertFrom(const IColumn & src, size_t n)
getNullMapData().push_back(src_concrete.getNullMapData()[n]);
}
void ColumnNullable::insertManyFrom(const IColumn & src, size_t position, size_t length)
{
const ColumnNullable & src_concrete = assert_cast<const ColumnNullable &>(src);
getNestedColumn().insertManyFrom(src_concrete.getNestedColumn(), position, length);
getNullMapColumn().insertManyFrom(src_concrete.getNullMapColumn(), position, length);
}
void ColumnNullable::insertFromNotNullable(const IColumn & src, size_t n)
{
getNestedColumn().insertFrom(src, n);

View File

@ -69,6 +69,7 @@ public:
void insert(const Field & x) override;
bool tryInsert(const Field & x) override;
void insertFrom(const IColumn & src, size_t n) override;
void insertManyFrom(const IColumn & src, size_t position, size_t length) override;
void insertFromNotNullable(const IColumn & src, size_t n);
void insertRangeFromNotNullable(const IColumn & src, size_t start, size_t length);

View File

@ -38,6 +38,27 @@ ColumnString::ColumnString(const ColumnString & src)
last_offset, chars.size());
}
void ColumnString::insertManyFrom(const IColumn & src, size_t position, size_t length)
{
const ColumnString & src_concrete = assert_cast<const ColumnString &>(src);
const UInt8 * src_buf = &src_concrete.chars[src_concrete.offsets[position - 1]];
const size_t src_buf_size
= src_concrete.offsets[position] - src_concrete.offsets[position - 1]; /// -1th index is Ok, see PaddedPODArray.
const size_t old_size = chars.size();
const size_t new_size = old_size + src_buf_size * length;
chars.resize(new_size);
const size_t old_rows = offsets.size();
offsets.resize(old_rows + length);
for (size_t current_offset = old_size; current_offset < new_size; current_offset += src_buf_size)
memcpySmallAllowReadWriteOverflow15(&chars[current_offset], src_buf, src_buf_size);
for (size_t i = 0, current_offset = old_size + src_buf_size; i < length; ++i, current_offset += src_buf_size)
offsets[old_rows + i] = current_offset;
}
MutableColumnPtr ColumnString::cloneResized(size_t to_size) const
{

View File

@ -160,6 +160,8 @@ public:
}
}
void insertManyFrom(const IColumn & src, size_t position, size_t length) override;
void insertData(const char * pos, size_t length) override
{
const size_t old_size = chars.size();

View File

@ -185,6 +185,18 @@ void ColumnTuple::insertFrom(const IColumn & src_, size_t n)
columns[i]->insertFrom(*src.columns[i], n);
}
void ColumnTuple::insertManyFrom(const IColumn & src, size_t position, size_t length)
{
const ColumnTuple & src_tuple = assert_cast<const ColumnTuple &>(src);
const size_t tuple_size = columns.size();
if (src_tuple.columns.size() != tuple_size)
throw Exception(ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE, "Cannot insert value of different size into tuple");
for (size_t i = 0; i < tuple_size; ++i)
columns[i]->insertManyFrom(*src_tuple.columns[i], position, length);
}
void ColumnTuple::insertDefault()
{
for (auto & column : columns)

View File

@ -60,6 +60,7 @@ public:
void insert(const Field & x) override;
bool tryInsert(const Field & x) override;
void insertFrom(const IColumn & src_, size_t n) override;
void insertManyFrom(const IColumn & src, size_t position, size_t length) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;

View File

@ -0,0 +1,4 @@
clickhouse_add_executable(column_insert_many_from benchmark_column_insert_many_from.cpp)
target_link_libraries (column_insert_many_from PRIVATE
ch_contrib::gbenchmark_all
dbms)

View File

@ -0,0 +1,102 @@
#include <cstddef>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/IDataType.h>
#include <base/types.h>
#include <benchmark/benchmark.h>
using namespace DB;
static constexpr size_t ROWS = 65536;
static ColumnPtr mockColumn(const DataTypePtr & type, size_t rows)
{
const auto * type_array = typeid_cast<const DataTypeArray *>(type.get());
if (type_array)
{
auto data_col = mockColumn(type_array->getNestedType(), rows);
auto offset_col = ColumnArray::ColumnOffsets::create(rows);
auto & offsets = offset_col->getData();
for (size_t i = 0; i < data_col->size(); ++i)
offsets[i] = offsets[i - 1] + (rand() % 10);
auto new_data_col = data_col->replicate(offsets);
return ColumnArray::create(new_data_col, std::move(offset_col));
}
auto type_not_nullable = removeNullable(type);
auto column = type->createColumn();
for (size_t i = 0; i < rows; ++i)
{
if (i % 100)
column->insertDefault();
else if (isInt(type_not_nullable))
column->insert(i);
else if (isFloat(type_not_nullable))
{
double d = i * 1.0;
column->insert(d);
}
else if (isString(type_not_nullable))
{
String s = "helloworld";
column->insert(s);
}
else
column->insertDefault();
}
return std::move(column);
}
static NO_INLINE void insertManyFrom(IColumn & dst, const IColumn & src)
{
size_t size = src.size();
dst.insertManyFrom(src, size / 2, size);
}
template <const std::string & str_type>
static void BM_insertManyFrom(benchmark::State & state)
{
auto type = DataTypeFactory::instance().get(str_type);
auto src = mockColumn(type, ROWS);
for (auto _ : state)
{
state.PauseTiming();
auto dst = type->createColumn();
dst->reserve(ROWS);
state.ResumeTiming();
insertManyFrom(*dst, *src);
benchmark::DoNotOptimize(dst);
}
}
static const String type_int64 = "Int64";
static const String type_nullable_int64 = "Nullable(Int64)";
static const String type_string = "String";
static const String type_nullable_string = "Nullable(String)";
static const String type_decimal = "Decimal128(3)";
static const String type_nullable_decimal = "Nullable(Decimal128(3))";
static const String type_array_int64 = "Array(Int64)";
static const String type_array_nullable_int64 = "Array(Nullable(Int64))";
static const String type_array_string = "Array(String)";
static const String type_array_nullable_string = "Array(Nullable(String))";
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_int64);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_nullable_int64);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_string);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_nullable_string);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_decimal);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_nullable_decimal);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_int64);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_nullable_int64);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_string);
BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_nullable_string);

View File

@ -325,12 +325,12 @@ void KeeperDispatcher::snapshotThread()
if (!snapshots_queue.pop(task))
break;
if (shutdown_called)
break;
try
{
auto snapshot_file_info = task.create_snapshot(std::move(task.snapshot));
auto snapshot_file_info = task.create_snapshot(std::move(task.snapshot), /*execute_only_cleanup=*/shutdown_called);
if (shutdown_called)
break;
if (snapshot_file_info.path.empty())
continue;

View File

@ -98,8 +98,7 @@ struct SnapshotFileInfo
};
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<SnapshotFileInfo(KeeperStorageSnapshotPtr &&)>;
using CreateSnapshotCallback = std::function<SnapshotFileInfo(KeeperStorageSnapshotPtr &&, bool)>;
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;

View File

@ -564,63 +564,65 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
/// create snapshot task for background execution (in snapshot thread)
snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot)
snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot, bool execute_only_cleanup)
{
nuraft::ptr<std::exception> exception(nullptr);
bool ret = true;
try
if (!execute_only_cleanup)
{
{ /// Read storage data without locks and create snapshot
std::lock_guard lock(snapshots_lock);
try
{
{ /// Read storage data without locks and create snapshot
std::lock_guard lock(snapshots_lock);
if (latest_snapshot_meta && snapshot->snapshot_meta->get_last_log_idx() <= latest_snapshot_meta->get_last_log_idx())
{
LOG_INFO(
log,
"Will not create a snapshot with last log idx {} because a snapshot with bigger last log idx ({}) is already "
"created",
snapshot->snapshot_meta->get_last_log_idx(),
latest_snapshot_meta->get_last_log_idx());
}
else
{
latest_snapshot_meta = snapshot->snapshot_meta;
/// we rely on the fact that the snapshot disk cannot be changed during runtime
if (isLocalDisk(*keeper_context->getLatestSnapshotDisk()))
if (latest_snapshot_meta && snapshot->snapshot_meta->get_last_log_idx() <= latest_snapshot_meta->get_last_log_idx())
{
auto snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot);
latest_snapshot_info = std::move(snapshot_info);
latest_snapshot_buf = nullptr;
LOG_INFO(
log,
"Will not create a snapshot with last log idx {} because a snapshot with bigger last log idx ({}) is already "
"created",
snapshot->snapshot_meta->get_last_log_idx(),
latest_snapshot_meta->get_last_log_idx());
}
else
{
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
latest_snapshot_info = std::move(snapshot_info);
latest_snapshot_buf = std::move(snapshot_buf);
}
latest_snapshot_meta = snapshot->snapshot_meta;
/// we rely on the fact that the snapshot disk cannot be changed during runtime
if (isLocalDisk(*keeper_context->getLatestSnapshotDisk()))
{
auto snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot);
latest_snapshot_info = std::move(snapshot_info);
latest_snapshot_buf = nullptr;
}
else
{
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
latest_snapshot_info = std::move(snapshot_info);
latest_snapshot_buf = std::move(snapshot_buf);
}
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations);
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), latest_snapshot_info.path);
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations);
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), latest_snapshot_info.path);
}
}
}
catch (...)
{
/// Destroy snapshot with lock
std::lock_guard lock(storage_and_responses_lock);
LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot();
LOG_TRACE(log, "Cleared garbage after snapshot");
snapshot.reset();
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed);
LOG_TRACE(log, "Exception happened during snapshot");
tryLogCurrentException(log);
ret = false;
}
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed);
LOG_TRACE(log, "Exception happened during snapshot");
tryLogCurrentException(log);
ret = false;
/// Destroy snapshot with lock
std::lock_guard lock(storage_and_responses_lock);
LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot();
LOG_TRACE(log, "Cleared garbage after snapshot");
snapshot.reset();
}
when_done(ret, exception);
@ -628,11 +630,10 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
return ret ? latest_snapshot_info : SnapshotFileInfo{};
};
if (keeper_context->getServerState() == KeeperContext::Phase::SHUTDOWN)
{
LOG_INFO(log, "Creating a snapshot during shutdown because 'create_snapshot_on_exit' is enabled.");
auto snapshot_file_info = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
auto snapshot_file_info = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot), /*execute_only_cleanup=*/false);
if (!snapshot_file_info.path.empty() && snapshot_manager_s3)
{

View File

@ -1818,7 +1818,7 @@ void testLogAndStateMachine(
bool pop_result = snapshots_queue.pop(snapshot_task);
EXPECT_TRUE(pop_result);
snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
snapshot_task.create_snapshot(std::move(snapshot_task.snapshot), /*execute_only_cleanup=*/false);
}
if (snapshot_created && changelog.size() > settings->reserved_log_items)

View File

@ -75,10 +75,9 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
String active_path = fs::path(database->replica_path) / "active";
String active_id = toString(ServerUUID::get());
zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id);
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
active_node_holder_zookeeper = zookeeper;
active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
String log_ptr_str = zookeeper->get(database->replica_path + "/log_ptr");
UInt32 our_log_ptr = parse<UInt32>(log_ptr_str);
@ -127,9 +126,15 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
initializeLogPointer(log_entry_name);
}
std::lock_guard lock{database->metadata_mutex};
if (!database->checkDigestValid(context, false))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent database metadata after reconnection to ZooKeeper");
{
std::lock_guard lock{database->metadata_mutex};
if (!database->checkDigestValid(context, false))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent database metadata after reconnection to ZooKeeper");
}
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
active_node_holder_zookeeper = zookeeper;
active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
}
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)

View File

@ -621,9 +621,9 @@ public:
}
else
{
if (!WhichDataType(arguments[0].type).isDateTime())
if (!WhichDataType(arguments[0].type).isDateTimeOrDateTime64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of first argument of function {}. "
"Must be a DateTime", arguments[0].type->getName(), getName());
"Must be a DateTime/DateTime64", arguments[0].type->getName(), getName());
if (!WhichDataType(arguments[2].type).isString())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of third argument of function {}. "

View File

@ -147,6 +147,9 @@ ReturnType parseDateTimeBestEffortImpl(
{
has_comma_between_date_and_time = true;
++in.position();
if (in.eof())
break;
}
}

View File

@ -44,7 +44,7 @@ public:
void setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_) override;
/// Default implementation for all the sources.
std::optional<ReadProgress> getReadProgress() final;
std::optional<ReadProgress> getReadProgress() override;
void addTotalRowsApprox(size_t value);
void addTotalBytes(size_t value);

View File

@ -80,6 +80,8 @@ namespace
return convertToChunk(block);
}
std::optional<ReadProgress> getReadProgress() override { return std::nullopt; }
private:
TemporaryFileStream * tmp_stream;
};

View File

@ -131,7 +131,7 @@ std::optional<MergeTreePartInfo> MergeTreePartInfo::tryParsePartName(
/// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295".
/// So we replace unexpected max level to make contains(...) method and comparison operators work
/// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged.
part_info.use_leagcy_max_level = true;
part_info.use_legacy_max_level = true;
level = MAX_LEVEL;
}
@ -205,7 +205,7 @@ String MergeTreePartInfo::getPartNameV1() const
writeChar('_', wb);
writeIntText(max_block, wb);
writeChar('_', wb);
if (use_leagcy_max_level)
if (use_legacy_max_level)
{
assert(level == MAX_LEVEL);
writeIntText(LEGACY_MAX_LEVEL, wb);
@ -244,7 +244,7 @@ String MergeTreePartInfo::getPartNameV0(DayNum left_date, DayNum right_date) con
writeChar('_', wb);
writeIntText(max_block, wb);
writeChar('_', wb);
if (use_leagcy_max_level)
if (use_legacy_max_level)
{
assert(level == MAX_LEVEL);
writeIntText(LEGACY_MAX_LEVEL, wb);
@ -274,7 +274,7 @@ void MergeTreePartInfo::serialize(WriteBuffer & out) const
writeIntBinary(max_block, out);
writeIntBinary(level, out);
writeIntBinary(mutation, out);
writeBoolText(use_leagcy_max_level, out);
writeBoolText(use_legacy_max_level, out);
}
@ -297,7 +297,7 @@ void MergeTreePartInfo::deserialize(ReadBuffer & in)
readIntBinary(max_block, in);
readIntBinary(level, in);
readIntBinary(mutation, in);
readBoolText(use_leagcy_max_level, in);
readBoolText(use_legacy_max_level, in);
}
bool MergeTreePartInfo::areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector<MergeTreePartInfo> candidates)

View File

@ -26,7 +26,7 @@ struct MergeTreePartInfo
UInt32 level = 0;
Int64 mutation = 0; /// If the part has been mutated or contains mutated parts, is equal to mutation version number.
bool use_leagcy_max_level = false; /// For compatibility. TODO remove it
bool use_legacy_max_level = false; /// For compatibility. TODO remove it
MergeTreePartInfo() = default;

View File

@ -23,6 +23,7 @@
#include <Columns/ColumnConst.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/Macros.h>
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
@ -282,6 +283,17 @@ size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & clus
return (num_remote_shards + num_local_shards) * settings.max_parallel_replicas;
}
template <class F>
void waitFutures(F & futures)
{
for (auto & future : futures)
future.wait();
/// Make sure there is no exception.
for (auto & future : futures)
future.get();
futures.clear();
}
}
/// For destruction of std::unique_ptr of type that is incomplete in class definition.
@ -1286,25 +1298,30 @@ void StorageDistributed::initializeFromDisk()
/// Make initialization for large number of disks parallel.
ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, disks.size());
std::vector<std::future<void>> futures;
for (const DiskPtr & disk : disks)
{
pool.scheduleOrThrowOnError([&]()
auto future = scheduleFromThreadPool<void>([this, disk_to_init = disk]
{
initializeDirectoryQueuesForDisk(disk);
});
initializeDirectoryQueuesForDisk(disk_to_init);
}, pool, "DistInit");
futures.push_back(std::move(future));
}
waitFutures(futures);
pool.wait();
const auto & paths = getDataPaths();
std::vector<UInt64> last_increment(paths.size());
for (size_t i = 0; i < paths.size(); ++i)
{
pool.scheduleOrThrowOnError([&, i]()
auto future = scheduleFromThreadPool<void>([&paths, &last_increment, i]
{
last_increment[i] = getMaximumFileNumber(paths[i]);
});
}, pool, "DistInit");
futures.push_back(std::move(future));
}
waitFutures(futures);
pool.wait();
for (const auto inc : last_increment)
@ -1734,16 +1751,33 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
directory_queues.push_back(node.second.directory_queue);
}
bool need_flush = getDistributedSettingsRef().flush_on_detach;
if (!need_flush)
if (getDistributedSettingsRef().flush_on_detach)
{
LOG_INFO(log, "Flushing pending INSERT blocks");
Stopwatch watch;
ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, directory_queues.size());
std::vector<std::future<void>> futures;
for (const auto & node : directory_queues)
{
auto future = scheduleFromThreadPool<void>([node_to_flush = node]
{
node_to_flush->flushAllData();
}, pool, "DistFlush");
futures.push_back(std::move(future));
}
waitFutures(futures);
pool.wait();
LOG_INFO(log, "Pending INSERT blocks flushed, took {} ms.", watch.elapsedMilliseconds());
}
else
{
LOG_INFO(log, "Skip flushing data (due to flush_on_detach=0)");
/// TODO: Maybe it should be executed in parallel
for (auto & node : directory_queues)
{
if (need_flush)
node->flushAllData();
else
for (auto & node : directory_queues)
node->shutdownWithoutFlush();
}
}

View File

@ -189,7 +189,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared<DataTypeUUID>(), "uuid"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared<DataTypeUUID>(), "storage_uuid"));
if (rows)
{
@ -201,7 +201,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte
database_column = block_to_filter.getByName("database").column;
table_column = block_to_filter.getByName("table").column;
active_column = block_to_filter.getByName("active").column;
storage_uuid_column = block_to_filter.getByName("uuid").column;
storage_uuid_column = block_to_filter.getByName("storage_uuid").column;
}
class ReadFromSystemPartsBase : public SourceStepWithFilter

View File

@ -100,20 +100,28 @@ def is_python(file: Union[Path, str]) -> bool:
"""returns if the changed file in the repository is python script"""
# WARNING: python-magic v2:0.4.24-2 is used in ubuntu 22.04,
# and `Support os.PathLike values in magic.from_file` is only from 0.4.25
return bool(
magic.from_file(os.path.join(REPO_COPY, file), mime=True)
== "text/x-script.python"
)
try:
return bool(
magic.from_file(os.path.join(REPO_COPY, file), mime=True)
== "text/x-script.python"
)
except IsADirectoryError:
# Process submodules w/o errors
return False
def is_shell(file: Union[Path, str]) -> bool:
"""returns if the changed file in the repository is shell script"""
# WARNING: python-magic v2:0.4.24-2 is used in ubuntu 22.04,
# and `Support os.PathLike values in magic.from_file` is only from 0.4.25
return bool(
magic.from_file(os.path.join(REPO_COPY, file), mime=True)
== "text/x-shellscript"
)
try:
return bool(
magic.from_file(os.path.join(REPO_COPY, file), mime=True)
== "text/x-shellscript"
)
except IsADirectoryError:
# Process submodules w/o errors
return False
def main():
@ -135,8 +143,8 @@ def main():
run_python_check = True
if CI and pr_info.number > 0:
pr_info.fetch_changed_files()
run_cpp_check = not any(
is_python(file) or is_shell(file) for file in pr_info.changed_files
run_cpp_check = any(
not (is_python(file) or is_shell(file)) for file in pr_info.changed_files
)
run_shell_check = any(is_shell(file) for file in pr_info.changed_files)
run_python_check = any(is_python(file) for file in pr_info.changed_files)

View File

@ -1138,6 +1138,10 @@ def test_sync_replica(started_cluster):
dummy_node.query("SYSTEM SYNC DATABASE REPLICA test_sync_database")
assert "2\n" == main_node.query(
"SELECT sum(is_active) FROM system.clusters WHERE cluster='test_sync_database'"
)
assert dummy_node.query(
"SELECT count() FROM system.tables where database='test_sync_database'"
).strip() == str(number_of_tables)

View File

@ -241,82 +241,82 @@ SELECT toYYYYMMDDhhmmss(N, \'Asia/Istanbul\')
SELECT addYears(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2020-09-16 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2020-09-16 19:20:11.234"
------------------------------------------
SELECT addMonths(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-10-16 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-10-16 19:20:11.234"
------------------------------------------
SELECT addWeeks(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-23 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-23 19:20:11.234"
------------------------------------------
SELECT addDays(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-17 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-17 19:20:11.234"
------------------------------------------
SELECT addHours(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-16 20:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-16 20:20:11.234"
------------------------------------------
SELECT addMinutes(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-16 19:21:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:21:11.234"
------------------------------------------
SELECT addSeconds(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-16 19:20:12"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:20:12.234"
------------------------------------------
SELECT addQuarters(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-12-16 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-12-16 19:20:11.234"
------------------------------------------
SELECT subtractYears(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2018-09-16 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2018-09-16 19:20:11.234"
------------------------------------------
SELECT subtractMonths(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-08-16 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-08-16 19:20:11.234"
------------------------------------------
SELECT subtractWeeks(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-09 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-09 19:20:11.234"
------------------------------------------
SELECT subtractDays(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-15 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-15 19:20:11.234"
------------------------------------------
SELECT subtractHours(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-16 18:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-16 18:20:11.234"
------------------------------------------
SELECT subtractMinutes(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-16 19:19:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:19:11.234"
------------------------------------------
SELECT subtractSeconds(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-09-16 19:20:10"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:20:10.234"
------------------------------------------
SELECT subtractQuarters(N, 1, \'Asia/Istanbul\')
Code: 43
"DateTime('Asia/Istanbul')","2019-06-16 19:20:11"
Code: 43
"DateTime64(3, 'Asia/Istanbul')","2019-06-16 19:20:11.234"
------------------------------------------
SELECT CAST(N as DateTime(\'Europe/Minsk\'))
"DateTime('Europe/Minsk')","2019-09-16 00:00:00"

View File

@ -14,7 +14,7 @@ export REPLICAS_TO_DROP
for i in $(seq $TOTAL_REPLICAS); do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_table_$i"
$CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key"
$CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key SETTINGS old_parts_lifetime=1"
done
function insert_thread() {
@ -35,7 +35,7 @@ function sync_and_drop_replicas() {
done
for i in $(seq $REPLICAS_TO_DROP); do
$CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key"
$CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key SETTINGS old_parts_lifetime=1"
done
done
}
@ -87,4 +87,4 @@ for i in $(seq $TOTAL_REPLICAS); do
if [ $i -gt $REPLICAS_TO_DROP ]; then
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_table_$i"
fi
done
done

View File

@ -0,0 +1,10 @@
00000000-0000-0000-0000-000000000000 1231_1_1_0
00000000-0000-0000-0000-000000000000 6666_2_2_0
00000000-0000-0000-0000-000000000000 1231_1_1_0 users
00000000-0000-0000-0000-000000000000 6666_2_2_0 users
00000000-0000-0000-0000-000000000000 1231_1_1_0 users uid
00000000-0000-0000-0000-000000000000 1231_1_1_0 users name
00000000-0000-0000-0000-000000000000 1231_1_1_0 users age
00000000-0000-0000-0000-000000000000 6666_2_2_0 users uid
00000000-0000-0000-0000-000000000000 6666_2_2_0 users name
00000000-0000-0000-0000-000000000000 6666_2_2_0 users age

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS users;
CREATE TABLE users (uid Int16, name String, age Int16) ENGINE=MergeTree ORDER BY uid PARTITION BY uid;
INSERT INTO users VALUES (1231, 'John', 33);
INSERT INTO users VALUES (6666, 'Ksenia', 48);
SELECT uuid, name from system.parts WHERE database = currentDatabase() AND table = 'users';
SELECT uuid, name, table from system.parts WHERE database = currentDatabase() AND table = 'users' AND uuid = '00000000-0000-0000-0000-000000000000';
SELECT uuid, name, table, column from system.parts_columns WHERE database = currentDatabase() AND table = 'users' AND uuid = '00000000-0000-0000-0000-000000000000';
DROP TABLE IF EXISTS users;

View File

@ -0,0 +1 @@
2024-01-11 00:00:00.000000

View File

@ -0,0 +1 @@
select addDays(toDateTime64('2024-01-01', 6, 'Asia/Shanghai'), 10, 'Asia/Shanghai');

View File

@ -0,0 +1 @@
SELECT parseDateTimeBestEffort(toFixedString('01/12/2017,', 11)); -- { serverError CANNOT_PARSE_DATETIME }

View File

@ -14,15 +14,3 @@ JSONCompactEachRowWithNames, false
e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactEachRowWithNames, true
e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactStringsEachRowWithNames, false
e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactStringsEachRowWithNames, true
e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactEachRowWithNamesAndTypes, false
d40c4327c63eded184eee185a5330e12 -
JSONCompactEachRowWithNamesAndTypes, true
d40c4327c63eded184eee185a5330e12 -
JSONCompactStringsEachRowWithNamesAndTypes, false
d40c4327c63eded184eee185a5330e12 -
JSONCompactStringsEachRowWithNamesAndTypes, true
d40c4327c63eded184eee185a5330e12 -

View File

@ -6,7 +6,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
FORMATS=('JSONEachRow' 'JSONCompactEachRow' 'JSONCompactStringsEachRow' 'JSONCompactEachRowWithNames' 'JSONCompactStringsEachRowWithNames' 'JSONCompactEachRowWithNamesAndTypes' 'JSONCompactStringsEachRowWithNamesAndTypes')
FORMATS=('JSONEachRow' 'JSONCompactEachRow' 'JSONCompactStringsEachRow' 'JSONCompactEachRowWithNames')
for format in "${FORMATS[@]}"
do

View File

@ -0,0 +1,12 @@
JSONCompactStringsEachRowWithNames, false
e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactStringsEachRowWithNames, true
e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactEachRowWithNamesAndTypes, false
d40c4327c63eded184eee185a5330e12 -
JSONCompactEachRowWithNamesAndTypes, true
d40c4327c63eded184eee185a5330e12 -
JSONCompactStringsEachRowWithNamesAndTypes, false
d40c4327c63eded184eee185a5330e12 -
JSONCompactStringsEachRowWithNamesAndTypes, true
d40c4327c63eded184eee185a5330e12 -

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
FORMATS=('JSONCompactStringsEachRowWithNames' 'JSONCompactEachRowWithNamesAndTypes' 'JSONCompactStringsEachRowWithNamesAndTypes')
for format in "${FORMATS[@]}"
do
echo "$format, false";
$CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum
echo "$format, true";
$CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=true -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum
done

View File

@ -1,12 +1,12 @@
TSV, false
194d5061de4cae59489d989373f8effe -
2cc7bfde1a2855814c6ea2c8181679c8 -
TSV, true
194d5061de4cae59489d989373f8effe -
2cc7bfde1a2855814c6ea2c8181679c8 -
TSVWithNames, false
a6d327a3611288b3f973d00e6116f16e -
c4cb6f9c0d77cd76f2584279993b4438 -
TSVWithNames, true
a6d327a3611288b3f973d00e6116f16e -
c4cb6f9c0d77cd76f2584279993b4438 -
TSKV, false
c2e32a21c08aacf60bda21248ce4f73f -
fd9ccbc364c90e1f7682348fe7f11a5a -
TSKV, true
c2e32a21c08aacf60bda21248ce4f73f -
fd9ccbc364c90e1f7682348fe7f11a5a -

View File

@ -11,9 +11,9 @@ for format in "${FORMATS[@]}"
do
echo "$format, false";
$CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum
echo "$format, true";
$CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=true -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum
done