Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2018-03-03 22:44:50 +03:00
commit ae401708ad
31 changed files with 199 additions and 164 deletions

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit cf1ad2e9a30ee9161772dc7bc9bf6e165cc51768
Subproject commit 8238852d7ab2a4abdf87adff233b3b83686f4fe4

View File

@ -206,9 +206,7 @@ template <> struct AggregateFunctionUniqTraits<UInt128>
{
static UInt64 hash(UInt128 x)
{
SipHash hash;
hash.update(reinterpret_cast<const char *>(&x), sizeof(x));
return hash.get64();
return sipHash64(x);
}
};
@ -243,9 +241,7 @@ template <> struct AggregateFunctionUniqCombinedTraits<UInt128>
{
static UInt32 hash(UInt128 x)
{
SipHash hash;
hash.update(reinterpret_cast<const char *>(&x), sizeof(x));
return static_cast<UInt32>(hash.get64());
return sipHash64(x);
}
};

View File

@ -118,9 +118,7 @@ struct AggregateFunctionUniqUpToData<UInt128> : AggregateFunctionUniqUpToData<UI
void add(const IColumn & column, size_t row_num, UInt8 threshold)
{
UInt128 value = static_cast<const ColumnVector<UInt128> &>(column).getData()[row_num];
SipHash hash;
hash.update(reinterpret_cast<const char *>(&value), sizeof(value));
insert(hash.get64(), threshold);
insert(sipHash64(value), threshold);
}
};

View File

@ -202,7 +202,7 @@ void ColumnArray::updateHashWithValue(size_t n, SipHash & hash) const
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);
hash.update(reinterpret_cast<const char *>(&array_size), sizeof(array_size));
hash.update(array_size);
for (size_t i = 0; i < array_size; ++i)
getData().updateHashWithValue(offset + i, hash);
}

View File

@ -36,7 +36,7 @@ ColumnNullable::ColumnNullable(const ColumnPtr & nested_column_, const ColumnPtr
void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const
{
const auto & arr = getNullMapData();
hash.update(reinterpret_cast<const char *>(&arr[n]), sizeof(arr[0]));
hash.update(arr[n]);
if (arr[n] == 0)
getNestedColumn().updateHashWithValue(n, hash);
}

View File

@ -48,7 +48,7 @@ const char * ColumnVector<T>::deserializeAndInsertFromArena(const char * pos)
template <typename T>
void ColumnVector<T>::updateHashWithValue(size_t n, SipHash & hash) const
{
hash.update(reinterpret_cast<const char *>(&data[n]), sizeof(T));
hash.update(data[n]);
}
template <typename T>

View File

@ -24,7 +24,11 @@ ConfigReloader::ConfigReloader(
{
if (!already_loaded)
reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true);
}
void ConfigReloader::start()
{
thread = std::thread(&ConfigReloader::run, this);
}
@ -35,6 +39,8 @@ ConfigReloader::~ConfigReloader()
{
quit = true;
zk_node_cache.getChangedEvent().set();
if (thread.joinable())
thread.join();
}
catch (...)

View File

@ -39,6 +39,9 @@ public:
~ConfigReloader();
/// Call this method to run the backround thread.
void start();
private:
void run();

View File

@ -145,45 +145,43 @@ FieldVisitorHash::FieldVisitorHash(SipHash & hash) : hash(hash) {}
void FieldVisitorHash::operator() (const Null &) const
{
UInt8 type = Field::Types::Null;
hash.update(reinterpret_cast<const char *>(&type), sizeof(type));
hash.update(type);
}
void FieldVisitorHash::operator() (const UInt64 & x) const
{
UInt8 type = Field::Types::UInt64;
hash.update(reinterpret_cast<const char *>(&type), sizeof(type));
hash.update(reinterpret_cast<const char *>(&x), sizeof(x));
hash.update(type);
hash.update(x);
}
void FieldVisitorHash::operator() (const Int64 & x) const
{
UInt8 type = Field::Types::Int64;
hash.update(reinterpret_cast<const char *>(&type), sizeof(type));
hash.update(reinterpret_cast<const char *>(&x), sizeof(x));
hash.update(type);
hash.update(x);
}
void FieldVisitorHash::operator() (const Float64 & x) const
{
UInt8 type = Field::Types::Float64;
hash.update(reinterpret_cast<const char *>(&type), sizeof(type));
hash.update(reinterpret_cast<const char *>(&x), sizeof(x));
hash.update(type);
hash.update(x);
}
void FieldVisitorHash::operator() (const String & x) const
{
UInt8 type = Field::Types::String;
hash.update(reinterpret_cast<const char *>(&type), sizeof(type));
size_t size = x.size();
hash.update(reinterpret_cast<const char *>(&size), sizeof(size));
hash.update(type);
hash.update(x.size());
hash.update(x.data(), x.size());
}
void FieldVisitorHash::operator() (const Array & x) const
{
UInt8 type = Field::Types::Array;
hash.update(reinterpret_cast<const char *>(&type), sizeof(type));
size_t size = x.size();
hash.update(reinterpret_cast<const char *>(&size), sizeof(size));
hash.update(type);
hash.update(x.size());
for (const auto & elem : x)
applyVisitor(*this, elem);

View File

@ -14,6 +14,7 @@
*/
#include <common/Types.h>
#include <type_traits>
#define ROTL(x, b) static_cast<UInt64>(((x) << (b)) | ((x) >> (64 - (b))))
@ -130,6 +131,13 @@ public:
}
}
/// NOTE: std::has_unique_object_representations is only available since clang 6. As of Mar 2017 we still use clang 5 sometimes.
template <typename T>
std::enable_if_t<std::/*has_unique_object_representations_v*/is_standard_layout_v<T>, void> update(const T & x)
{
update(reinterpret_cast<const char *>(&x), sizeof(x));
}
/// Get the result in some form. This can only be done once!
void get128(char * out)
@ -173,6 +181,14 @@ inline UInt64 sipHash64(const char * data, const size_t size)
return hash.get64();
}
template <typename T>
std::enable_if_t<std::/*has_unique_object_representations_v*/is_standard_layout_v<T>, UInt64> sipHash64(const T & x)
{
SipHash hash;
hash.update(x);
return hash.get64();
}
#include <string>
inline UInt64 sipHash64(const std::string & s)

View File

@ -4,6 +4,7 @@
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <Common/SipHash.h>
#include <Core/Types.h>
#ifdef __APPLE__
@ -25,5 +26,13 @@ DB::UInt64 randomSeed()
struct timespec times;
if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times))
DB::throwFromErrno("Cannot clock_gettime.", DB::ErrorCodes::CANNOT_CLOCK_GETTIME);
return times.tv_nsec + times.tv_sec + getpid();
/// Not cryptographically secure as time, pid and stack address can be predictable.
SipHash hash;
hash.update(times.tv_nsec);
hash.update(times.tv_sec);
hash.update(getpid());
hash.update(&times);
return hash.get64();
}

View File

@ -201,7 +201,7 @@ namespace Hashes
size_t operator()(Key x) const
{
::SipHash hash;
hash.update(reinterpret_cast<const char *>(&x), sizeof(x));
hash.update(x);
return hash.get64();
}
};

View File

@ -51,7 +51,7 @@ public:
SipHash hash;
hash.update(path_to_file.data(), path_to_file.size() + 1);
hash.update(reinterpret_cast<const char *>(&offset), sizeof(offset));
hash.update(offset);
hash.get128(key.low, key.high);
return key;

View File

@ -59,7 +59,7 @@ static Compiler::HashedKey getHash(const std::string & key)
SipHash hash;
auto revision = ClickHouseRevision::get();
hash.update(reinterpret_cast<const char *>(&revision), sizeof(revision));
hash.update(revision);
hash.update(key.data(), key.size());
Compiler::HashedKey res;

View File

@ -75,10 +75,7 @@ void IAST::getTreeHashImpl(SipHash & hash_state) const
{
auto id = getID();
hash_state.update(id.data(), id.size());
size_t num_children = children.size();
hash_state.update(reinterpret_cast<const char *>(&num_children), sizeof(num_children));
hash_state.update(children.size());
for (const auto & child : children)
child->getTreeHashImpl(hash_state);
}

View File

@ -473,6 +473,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
for (auto & server : servers)
server->start();
main_config_reloader->start();
users_config_reloader->start();
{
std::stringstream message;
message << "Available RAM = " << formatReadableSizeWithBinarySuffix(getMemoryAmount()) << ";"

View File

@ -35,7 +35,7 @@ MergeInfo MergeListElement::getInfo() const
res.table = table;
res.result_part_name = result_part_name;
res.elapsed = watch.elapsedSeconds();
res.progress = progress;
res.progress = progress.load(std::memory_order_relaxed);
res.num_parts = num_parts;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_marks = total_size_marks;

View File

@ -50,7 +50,7 @@ struct MergeListElement : boost::noncopyable
const std::string table;
const std::string result_part_name;
Stopwatch watch;
Float64 progress{};
std::atomic<Float64> progress{};
UInt64 num_parts{};
Names source_part_names;
UInt64 total_size_bytes_compressed{};

View File

@ -561,7 +561,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it)
{
(*it)->remove_time = (*it)->modification_time;
(*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
modifyPartState(it, DataPartState::Outdated);
};
@ -677,9 +677,11 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
{
const DataPartPtr & part = *it;
auto part_remove_time = part->remove_time.load(std::memory_order_relaxed);
if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example).
part->remove_time < now &&
now - part->remove_time > settings.old_parts_lifetime.totalSeconds())
part_remove_time < now &&
now - part_remove_time > settings.old_parts_lifetime.totalSeconds())
{
parts_to_delete.emplace_back(it);
}
@ -1519,7 +1521,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
auto current_time = time(nullptr);
for (const DataPartPtr & covered_part : covered_parts)
{
covered_part->remove_time = current_time;
covered_part->remove_time.store(current_time, std::memory_order_relaxed);
modifyPartState(covered_part, DataPartState::Outdated);
removePartContributionToColumnSizes(covered_part);
}
@ -1550,7 +1552,7 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo
removePartContributionToColumnSizes(part);
modifyPartState(part, DataPartState::Outdated);
part->remove_time = remove_time;
part->remove_time.store(remove_time, std::memory_order_relaxed);
}
}
@ -2175,7 +2177,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit()
LOG_WARNING(data->log, "Tried to commit obsolete part " << part->name
<< " covered by " << covering_part->getNameWithState());
part->remove_time = 0; /// The part will be removed without waiting for old_parts_lifetime seconds.
part->remove_time.store(0, std::memory_order_relaxed); /// The part will be removed without waiting for old_parts_lifetime seconds.
data->modifyPartState(part, DataPartState::Outdated);
}
else
@ -2183,7 +2185,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit()
total_covered_parts.insert(total_covered_parts.end(), covered_parts.begin(), covered_parts.end());
for (const DataPartPtr & covered_part : covered_parts)
{
covered_part->remove_time = current_time;
covered_part->remove_time.store(current_time, std::memory_order_relaxed);
data->modifyPartState(covered_part, DataPartState::Outdated);
data->removePartContributionToColumnSizes(covered_part);
}

View File

@ -466,7 +466,7 @@ public:
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->rows_read += value.rows;
merge_entry->progress = average_elem_progress * merge_entry->rows_read;
merge_entry->progress.store(average_elem_progress * merge_entry->rows_read, std::memory_order_relaxed);
};
};
@ -476,10 +476,9 @@ public:
class MergeProgressCallbackVerticalStep : public MergeProgressCallback
{
public:
MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact,
const ColumnSizeEstimator & column_sizes, const String & column_name, UInt64 & watch_prev_elapsed_)
: MergeProgressCallback(merge_entry_, watch_prev_elapsed_), initial_progress(merge_entry->progress)
: MergeProgressCallback(merge_entry_, watch_prev_elapsed_), initial_progress(merge_entry->progress.load(std::memory_order_relaxed))
{
average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact);
updateWatch();
@ -496,7 +495,9 @@ public:
rows_read_internal += value.rows;
Float64 local_progress = average_elem_progress * rows_read_internal;
merge_entry->progress = initial_progress + local_progress;
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->progress.store(initial_progress + local_progress, std::memory_order_relaxed);
};
};
@ -678,7 +679,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compability
Float64 progress = (merge_alg == MergeAlgorithm::Horizontal)
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress);
: std::min(1., merge_entry->progress.load(std::memory_order_relaxed));
disk_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
}
@ -696,7 +697,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
size_t sum_input_rows_exact = merge_entry->rows_read;
merge_entry->columns_written = merging_column_names.size();
merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact);
merge_entry->progress.store(column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed);
BlockInputStreams column_part_streams(parts.size());
NameSet offset_columns_written;
@ -715,7 +716,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = Nested::extractTableName(column_name);
Names column_name_{column_name};
Float64 progress_before = merge_entry->progress;
Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed);
bool offset_written = offset_columns_written.count(offset_column_name);
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
@ -753,9 +754,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->columns_written = merging_column_names.size() + column_num;
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact);
merge_entry->progress.store(progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed);
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);

View File

@ -20,7 +20,7 @@
#include <common/logger_useful.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
#define MERGE_TREE_MARK_SIZE (2 * sizeof(UInt64))
namespace DB
@ -64,7 +64,7 @@ void MergeTreeDataPartChecksum::checkSize(const String & path) const
Poco::File file(path);
if (!file.exists())
throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
size_t size = file.getSize();
UInt64 size = file.getSize();
if (size != file_size)
throw Exception(path + " has unexpected size: " + toString(size) + " instead of " + toString(file_size),
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
@ -223,7 +223,7 @@ void MergeTreeDataPartChecksums::write(WriteBuffer & to) const
}
}
void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_size, MergeTreeDataPartChecksum::uint128 file_hash)
void MergeTreeDataPartChecksums::addFile(const String & file_name, UInt64 file_size, MergeTreeDataPartChecksum::uint128 file_hash)
{
files[file_name] = Checksum(file_size, file_hash);
}
@ -248,11 +248,11 @@ void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const
if (!endsWith(name, ".bin"))
continue;
size_t len = name.size();
hash.update(reinterpret_cast<const char *>(&len), sizeof(len));
UInt64 len = name.size();
hash.update(len);
hash.update(name.data(), len);
hash.update(reinterpret_cast<const char *>(&sum.uncompressed_size), sizeof(sum.uncompressed_size));
hash.update(reinterpret_cast<const char *>(&sum.uncompressed_hash), sizeof(sum.uncompressed_hash));
hash.update(sum.uncompressed_size);
hash.update(sum.uncompressed_hash);
}
}
@ -391,7 +391,7 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
}
/// Returns the size of .bin file for column `name` if found, zero otherwise.
size_t MergeTreeDataPart::getColumnCompressedSize(const String & name) const
UInt64 MergeTreeDataPart::getColumnCompressedSize(const String & name) const
{
const Checksum * checksum = tryGetBinChecksum(name);
@ -399,14 +399,14 @@ size_t MergeTreeDataPart::getColumnCompressedSize(const String & name) const
return checksum ? checksum->file_size : 0;
}
size_t MergeTreeDataPart::getColumnUncompressedSize(const String & name) const
UInt64 MergeTreeDataPart::getColumnUncompressedSize(const String & name) const
{
const Checksum * checksum = tryGetBinChecksum(name);
return checksum ? checksum->uncompressed_size : 0;
}
size_t MergeTreeDataPart::getColumnMrkSize(const String & name) const
UInt64 MergeTreeDataPart::getColumnMrkSize(const String & name) const
{
const Checksum * checksum = tryGetMrkChecksum(name);
return checksum ? checksum->file_size : 0;
@ -420,7 +420,7 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
{
const auto & columns = storage.getColumnsList();
const std::string * minimum_size_column = nullptr;
size_t minimum_size = std::numeric_limits<size_t>::max();
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
for (const auto & column : columns)
{
@ -507,14 +507,14 @@ MergeTreeDataPart::~MergeTreeDataPart()
}
}
size_t MergeTreeDataPart::calculateTotalSize(const String & from)
UInt64 MergeTreeDataPart::calculateTotalSize(const String & from)
{
Poco::File cur(from);
if (cur.isFile())
return cur.getSize();
std::vector<std::string> files;
cur.list(files);
size_t res = 0;
UInt64 res = 0;
for (const auto & file : files)
res += calculateTotalSize(from + file);
return res;
@ -886,7 +886,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
/// Check that all marks are nonempty and have the same size.
std::optional<size_t> marks_size;
std::optional<UInt64> marks_size;
for (const NameAndTypePair & name_type : columns)
{
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
@ -896,7 +896,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
/// Missing file is Ok for case when new column was added.
if (file.exists())
{
size_t file_size = file.getSize();
UInt64 file_size = file.getSize();
if (!file_size)
throw Exception("Part " + path + " is broken: " + file.path() + " is empty.",
@ -926,25 +926,25 @@ bool MergeTreeDataPart::hasColumnFiles(const String & column) const
}
size_t MergeTreeDataPart::getIndexSizeInBytes() const
UInt64 MergeTreeDataPart::getIndexSizeInBytes() const
{
size_t res = 0;
UInt64 res = 0;
for (const ColumnPtr & column : index)
res += column->byteSize();
return res;
}
size_t MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
UInt64 MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
{
size_t res = 0;
UInt64 res = 0;
for (const ColumnPtr & column : index)
res += column->allocatedBytes();
return res;
}
size_t MergeTreeDataPart::getTotalMrkSizeInBytes() const
UInt64 MergeTreeDataPart::getTotalMrkSizeInBytes() const
{
size_t res = 0;
UInt64 res = 0;
for (const NameAndTypePair & it : columns)
{
const Checksum * checksum = tryGetMrkChecksum(it.name);

View File

@ -21,16 +21,16 @@ struct MergeTreeDataPartChecksum
{
using uint128 = CityHash_v1_0_2::uint128;
size_t file_size {};
UInt64 file_size {};
uint128 file_hash {};
bool is_compressed = false;
size_t uncompressed_size {};
UInt64 uncompressed_size {};
uint128 uncompressed_hash {};
MergeTreeDataPartChecksum() {}
MergeTreeDataPartChecksum(size_t file_size_, uint128 file_hash_) : file_size(file_size_), file_hash(file_hash_) {}
MergeTreeDataPartChecksum(size_t file_size_, uint128 file_hash_, size_t uncompressed_size_, uint128 uncompressed_hash_)
MergeTreeDataPartChecksum(UInt64 file_size_, uint128 file_hash_) : file_size(file_size_), file_hash(file_hash_) {}
MergeTreeDataPartChecksum(UInt64 file_size_, uint128 file_hash_, UInt64 uncompressed_size_, uint128 uncompressed_hash_)
: file_size(file_size_), file_hash(file_hash_), is_compressed(true),
uncompressed_size(uncompressed_size_), uncompressed_hash(uncompressed_hash_) {}
@ -50,7 +50,7 @@ struct MergeTreeDataPartChecksums
using FileChecksums = std::map<String, Checksum>;
FileChecksums files;
void addFile(const String & file_name, size_t file_size, Checksum::uint128 file_hash);
void addFile(const String & file_name, UInt64 file_size, Checksum::uint128 file_hash);
void add(MergeTreeDataPartChecksums && rhs_checksums);
@ -104,10 +104,10 @@ struct MergeTreeDataPart
const Checksum * tryGetMrkChecksum(const String & name) const;
/// Returns the size of .bin file for column `name` if found, zero otherwise
size_t getColumnCompressedSize(const String & name) const;
size_t getColumnUncompressedSize(const String & name) const;
UInt64 getColumnCompressedSize(const String & name) const;
UInt64 getColumnUncompressedSize(const String & name) const;
/// Returns the size of .mrk file for column `name` if found, zero otherwise
size_t getColumnMrkSize(const String & name) const;
UInt64 getColumnMrkSize(const String & name) const;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
@ -136,10 +136,10 @@ struct MergeTreeDataPart
size_t rows_count = 0;
size_t marks_count = 0;
std::atomic<size_t> size_in_bytes {0}; /// size in bytes, 0 - if not counted;
std::atomic<UInt64> size_in_bytes {0}; /// size in bytes, 0 - if not counted;
/// is used from several threads without locks (it is changed with ALTER).
time_t modification_time = 0;
mutable time_t remove_time = std::numeric_limits<time_t>::max(); /// When the part is removed from the working set.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() }; /// When the part is removed from the working set. Changes once.
/// If true, the destructor will delete the directory with the part.
bool is_temp = false;
@ -255,7 +255,7 @@ struct MergeTreeDataPart
/// Columns description.
NamesAndTypesList columns;
using ColumnToSize = std::map<std::string, size_t>;
using ColumnToSize = std::map<std::string, UInt64>;
/** It is blocked for writing when changing columns, checksums or any part files.
* Locked to read when reading columns, checksums or any part files.
@ -275,7 +275,7 @@ struct MergeTreeDataPart
~MergeTreeDataPart();
/// Calculate the total size of the entire directory with all the files
static size_t calculateTotalSize(const String & from);
static UInt64 calculateTotalSize(const String & from);
void remove() const;
@ -297,10 +297,10 @@ struct MergeTreeDataPart
bool hasColumnFiles(const String & column) const;
/// For data in RAM ('index')
size_t getIndexSizeInBytes() const;
size_t getIndexSizeInAllocatedBytes() const;
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
/// Total size of *.mrk files
size_t getTotalMrkSizeInBytes() const;
UInt64 getTotalMrkSizeInBytes() const;
private:
/// Reads columns names and types from columns.txt

View File

@ -31,7 +31,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
LOG_DEBUG(log, "Loading queue from " << queue_path);
bool updated = false;
bool min_unprocessed_insert_time_changed = false;
std::optional<time_t> min_unprocessed_insert_time_changed;
{
std::lock_guard<std::mutex> lock(mutex);
@ -65,17 +65,12 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
entry->znode_name = future.first;
time_t prev_min_unprocessed_insert_time = min_unprocessed_insert_time;
insertUnlocked(entry);
insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
updated = true;
if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time)
min_unprocessed_insert_time_changed = true;
}
}
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, false);
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
LOG_TRACE(log, "Loaded queue");
return updated;
@ -96,7 +91,7 @@ void ReplicatedMergeTreeQueue::initialize(
}
void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry)
void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, std::lock_guard<std::mutex> &)
{
virtual_parts.add(entry->new_part_name);
queue.push_back(entry);
@ -106,30 +101,32 @@ void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry)
inserts_by_time.insert(entry);
if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time))
{
min_unprocessed_insert_time = entry->create_time;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
}
}
void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
{
time_t prev_min_unprocessed_insert_time;
std::optional<time_t> min_unprocessed_insert_time_changed;
{
std::lock_guard<std::mutex> lock(mutex);
prev_min_unprocessed_insert_time = min_unprocessed_insert_time;
insertUnlocked(entry);
insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
}
if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time)
updateTimesInZooKeeper(zookeeper, true, false);
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
}
void ReplicatedMergeTreeQueue::updateTimesOnRemoval(
const LogEntryPtr & entry,
bool & min_unprocessed_insert_time_changed,
bool & max_processed_insert_time_changed)
std::optional<time_t> & min_unprocessed_insert_time_changed,
std::optional<time_t> & max_processed_insert_time_changed,
std::unique_lock<std::mutex> &)
{
if (entry->type != LogEntry::GET_PART)
return;
@ -139,39 +136,40 @@ void ReplicatedMergeTreeQueue::updateTimesOnRemoval(
if (inserts_by_time.empty())
{
min_unprocessed_insert_time = 0;
min_unprocessed_insert_time_changed = true;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time)
{
min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time;
min_unprocessed_insert_time_changed = true;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
if (entry->create_time > max_processed_insert_time)
{
max_processed_insert_time = entry->create_time;
max_processed_insert_time_changed = true;
max_processed_insert_time_changed = max_processed_insert_time;
}
}
void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
zkutil::ZooKeeperPtr zookeeper,
bool min_unprocessed_insert_time_changed,
bool max_processed_insert_time_changed)
std::optional<time_t> min_unprocessed_insert_time_changed,
std::optional<time_t> max_processed_insert_time_changed) const
{
/// Here there can be a race condition (with different remove at the same time).
/// Here there can be a race condition (with different remove at the same time)
/// because we update times in ZooKeeper with unlocked mutex, while these times may change.
/// Consider it unimportant (for a short time, ZK will have a slightly different time value).
/// We also read values of `min_unprocessed_insert_time`, `max_processed_insert_time` without synchronization.
zkutil::Ops ops;
if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1));
replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
if (max_processed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/max_processed_insert_time", toString(max_processed_insert_time), -1));
replica_path + "/max_processed_insert_time", toString(*max_processed_insert_time_changed), -1));
if (!ops.empty())
{
@ -193,11 +191,11 @@ void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPt
LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often.");
bool min_unprocessed_insert_time_changed = false;
bool max_processed_insert_time_changed = false;
std::optional<time_t> min_unprocessed_insert_time_changed;
std::optional<time_t> max_processed_insert_time_changed;
{
std::lock_guard<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
/// Remove the job from the queue in the RAM.
/// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
@ -213,7 +211,7 @@ void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPt
}
}
updateTimesOnRemoval(entry, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
updateTimesOnRemoval(entry, min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
}
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
@ -224,11 +222,11 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
{
LogEntryPtr found;
bool min_unprocessed_insert_time_changed = false;
bool max_processed_insert_time_changed = false;
std::optional<time_t> min_unprocessed_insert_time_changed;
std::optional<time_t> max_processed_insert_time_changed;
{
std::lock_guard<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
@ -236,7 +234,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
{
found = *it;
queue.erase(it++);
updateTimesOnRemoval(found, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
updateTimesOnRemoval(found, min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
break;
}
else
@ -329,7 +327,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
std::vector<LogEntryPtr> copied_entries;
copied_entries.reserve(end - begin);
bool min_unprocessed_insert_time_changed = false;
std::optional<time_t> min_unprocessed_insert_time_changed;
for (auto & future : futures)
{
@ -346,7 +344,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time))
{
min_unprocessed_insert_time = entry.create_time;
min_unprocessed_insert_time_changed = true;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
}
}
@ -356,7 +354,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1));
replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
try
{
@ -384,7 +382,8 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
String path_created = dynamic_cast<zkutil::Op::Create &>(*ops[i]).getPathCreated();
copied_entries[i]->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
insertUnlocked(copied_entries[i]);
std::optional<time_t> unused = false;
insertUnlocked(copied_entries[i], unused, lock);
}
last_queue_update = time(nullptr);
@ -459,8 +458,8 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
{
Queue to_wait;
size_t removed_entries = 0;
bool min_unprocessed_insert_time_changed = false;
bool max_processed_insert_time_changed = false;
std::optional<time_t> min_unprocessed_insert_time_changed;
std::optional<time_t> max_processed_insert_time_changed;
/// Remove operations with parts, contained in the range to be deleted, from the queue.
std::unique_lock<std::mutex> lock(mutex);
@ -476,7 +475,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code));
updateTimesOnRemoval(*it, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
updateTimesOnRemoval(*it, min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
queue.erase(it++);
++removed_entries;
}
@ -496,7 +495,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
ReplicatedMergeTreeQueue::Queue ReplicatedMergeTreeQueue::getConflictsForClearColumnCommand(
const LogEntry & entry, String * out_conflicts_description)
const LogEntry & entry, String * out_conflicts_description, std::lock_guard<std::mutex> &) const
{
Queue conflicts;
@ -541,7 +540,7 @@ void ReplicatedMergeTreeQueue::disableMergesAndFetchesInRange(const LogEntry & e
std::lock_guard<std::mutex> lock(mutex);
String conflicts_description;
if (!getConflictsForClearColumnCommand(entry, &conflicts_description).empty())
if (!getConflictsForClearColumnCommand(entry, &conflicts_description, lock).empty())
throw Exception(conflicts_description, ErrorCodes::UNFINISHED);
if (!future_parts.count(entry.new_part_name))
@ -549,10 +548,8 @@ void ReplicatedMergeTreeQueue::disableMergesAndFetchesInRange(const LogEntry & e
}
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason)
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> &) const
{
/// mutex should been already acquired
/// Let's check if the same part is now being created by another action.
if (future_parts.count(new_part_name))
{
@ -589,7 +586,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
{
std::lock_guard<std::mutex> lock(mutex);
if (isNotCoveredByFuturePartsImpl(part_name, reject_reason))
if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
return true;
@ -603,13 +600,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
const LogEntry & entry,
String & out_postpone_reason,
MergeTreeDataMerger & merger,
MergeTreeData & data)
MergeTreeData & data,
std::lock_guard<std::mutex> & lock) const
{
/// mutex has already been acquired. The function is called only from `selectEntryToProcess`.
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
{
if (!isNotCoveredByFuturePartsImpl(entry.new_part_name, out_postpone_reason))
if (!isNotCoveredByFuturePartsImpl(entry.new_part_name, out_postpone_reason, lock))
{
if (!out_postpone_reason.empty())
LOG_DEBUG(log, out_postpone_reason);
@ -669,7 +665,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (entry.type == LogEntry::CLEAR_COLUMN)
{
String conflicts_description;
if (!getConflictsForClearColumnCommand(entry, &conflicts_description).empty())
if (!getConflictsForClearColumnCommand(entry, &conflicts_description, lock).empty())
{
LOG_DEBUG(log, conflicts_description);
return false;
@ -740,7 +736,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
if ((*it)->currently_executing)
continue;
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger, data))
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger, data, lock))
{
entry = *it;
queue.splice(queue.end(), queue, it);
@ -800,7 +796,7 @@ void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus()
ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
{
std::lock_guard<std::mutex> lock(mutex);
@ -848,7 +844,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus()
}
void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res)
void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const
{
res.clear();
std::lock_guard<std::mutex> lock(mutex);
@ -859,7 +855,7 @@ void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res)
}
size_t ReplicatedMergeTreeQueue::countMerges()
size_t ReplicatedMergeTreeQueue::countMerges() const
{
size_t all_merges = 0;

View File

@ -1,5 +1,7 @@
#pragma once
#include <optional>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -87,27 +89,35 @@ private:
/// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
bool load(zkutil::ZooKeeperPtr zookeeper);
void insertUnlocked(LogEntryPtr & entry);
void insertUnlocked(LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, std::lock_guard<std::mutex> &);
void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
/** Can I now try this action. If not, you need to leave it in the queue and try another one.
* Called under queue_mutex.
*/
bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, MergeTreeData & data);
bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, MergeTreeData & data,
std::lock_guard<std::mutex> &) const;
/** Check that part isn't in currently generating parts and isn't covered by them.
* Should be called under queue's mutex.
*/
bool isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason);
bool isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> &) const;
/// After removing the queue element, update the insertion times in the RAM. Running under queue_mutex.
/// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper.
void updateTimesOnRemoval(const LogEntryPtr & entry, bool & min_unprocessed_insert_time_changed, bool & max_processed_insert_time_changed);
void updateTimesOnRemoval(const LogEntryPtr & entry,
std::optional<time_t> & min_unprocessed_insert_time_changed,
std::optional<time_t> & max_processed_insert_time_changed,
std::unique_lock<std::mutex> &);
/// Update the insertion times in ZooKeeper.
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, bool min_unprocessed_insert_time_changed, bool max_processed_insert_time_changed);
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper,
std::optional<time_t> min_unprocessed_insert_time_changed,
std::optional<time_t> max_processed_insert_time_changed) const;
/// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command
Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard<std::mutex> &) const;
/// Marks the element of the queue as running.
class CurrentlyExecuting
@ -165,12 +175,6 @@ public:
*/
void disableMergesAndFetchesInRange(const LogEntry & entry);
/** Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command
* Call it under mutex
*/
Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description);
/** In the case where there are not enough parts to perform the merge in part_name
* - move actions with merged parts to the end of the queue
* (in order to download a already merged part from another replica).
@ -203,7 +207,7 @@ public:
bool addFuturePartIfNotCoveredByThem(const String & part_name, const LogEntry & entry, String & reject_reason);
/// Count the number of merges in the queue.
size_t countMerges();
size_t countMerges() const;
struct Status
{
@ -220,11 +224,11 @@ public:
};
/// Get information about the queue.
Status getStatus();
Status getStatus() const;
/// Get the data of the queue elements.
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getEntries(LogEntriesData & res);
void getEntries(LogEntriesData & res) const;
/// Get information about the insertion times.
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;

View File

@ -140,6 +140,10 @@ static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER = 5 * 60;
/** For randomized selection of replicas. */
thread_local pcg64 rng{randomSeed()};
void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard<std::mutex> lock(current_zookeeper_mutex);

View File

@ -284,8 +284,6 @@ private:
Logger * log;
pcg64 rng{randomSeed()};
/// Initialization.
/** Creates the minimum set of nodes in ZooKeeper.

View File

@ -66,7 +66,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor
columns[i++]->insert(static_cast<UInt64>(part->rows_count));
columns[i++]->insert(static_cast<UInt64>(part->size_in_bytes));
columns[i++]->insert(static_cast<UInt64>(part->modification_time));
columns[i++]->insert(static_cast<UInt64>(part->remove_time));
columns[i++]->insert(static_cast<UInt64>(part->remove_time.load(std::memory_order_relaxed)));
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
columns[i++]->insert(static_cast<UInt64>(part.use_count() - 1));

View File

@ -112,7 +112,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
columns[j++]->insert(static_cast<UInt64>(part->rows_count));
columns[j++]->insert(static_cast<UInt64>(part->size_in_bytes));
columns[j++]->insert(static_cast<UInt64>(part->modification_time));
columns[j++]->insert(static_cast<UInt64>(part->remove_time));
columns[j++]->insert(static_cast<UInt64>(part->remove_time.load(std::memory_order_relaxed)));
columns[j++]->insert(static_cast<UInt64>(use_count));

View File

@ -17,7 +17,7 @@ scp ./dbms/src/Server/clickhouse yourserver:~/clickhouse-asan
# Start ClickHouse and run tests
sudo -u clickhouse ./clickhouse-asan --config /etc/clickhouse-server/config.xml
sudo -u clickhouse ./clickhouse-asan server --config /etc/clickhouse-server/config.xml
# How to use Thread Sanitizer
@ -35,4 +35,4 @@ scp ./dbms/src/Server/clickhouse yourserver:~/clickhouse-tsan
# Start ClickHouse and run tests
sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1' ./clickhouse-tsan --config /etc/clickhouse-server/config.xml
sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1 suppressions=tsan_suppressions' ./clickhouse-tsan server --config /etc/clickhouse-server/config.xml

View File

@ -0,0 +1,2 @@
# ZooKeeper C library is a trash:
race:contrib/zookeeper

View File

@ -1,5 +1,5 @@
if (APPLE)
if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Darwin" AND NOT "${CMAKE_SYSTEM_VERSION}" VERSION_LESS "16.1.0")
if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Darwin" AND NOT "${CMAKE_SYSTEM_VERSION}" VERSION_LESS "16.0.0")
set (APPLE_SIERRA_OR_NEWER 1)
else ()
set (APPLE_SIERRA_OR_NEWER 0)