mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-19 04:42:37 +00:00
Merge remote-tracking branch 'origin/master' into pr-3-way-joins
This commit is contained in:
commit
bf66a07b1e
@ -103,6 +103,26 @@ namespace Net
|
||||
///
|
||||
/// The default limit is 100.
|
||||
|
||||
int getNameLengthLimit() const;
|
||||
/// Returns the maximum length of a field name.
|
||||
///
|
||||
/// See setNameLengthLimit() for more information.
|
||||
|
||||
void setNameLengthLimit(int limit);
|
||||
/// Sets the maximum length of a field name.
|
||||
///
|
||||
/// The default limit is 256.
|
||||
|
||||
int getValueLengthLimit() const;
|
||||
/// Returns the maximum length of a field value.
|
||||
///
|
||||
/// See setValueLengthLimit() for more information.
|
||||
|
||||
void setValueLengthLimit(int limit);
|
||||
/// Sets the maximum length of a field value.
|
||||
///
|
||||
/// The default limit is 8192.
|
||||
|
||||
bool hasToken(const std::string & fieldName, const std::string & token) const;
|
||||
/// Returns true iff the field with the given fieldName contains
|
||||
/// the given token. Tokens in a header field are expected to be
|
||||
@ -157,12 +177,14 @@ namespace Net
|
||||
enum Limits
|
||||
/// Limits for basic sanity checks when reading a header
|
||||
{
|
||||
MAX_NAME_LENGTH = 256,
|
||||
MAX_VALUE_LENGTH = 8192,
|
||||
DFL_NAME_LENGTH_LIMIT = 256,
|
||||
DFL_VALUE_LENGTH_LIMIT = 8192,
|
||||
DFL_FIELD_LIMIT = 100
|
||||
};
|
||||
|
||||
int _fieldLimit;
|
||||
int _nameLengthLimit;
|
||||
int _valueLengthLimit;
|
||||
};
|
||||
|
||||
|
||||
|
@ -28,14 +28,18 @@ namespace Net {
|
||||
|
||||
|
||||
MessageHeader::MessageHeader():
|
||||
_fieldLimit(DFL_FIELD_LIMIT)
|
||||
_fieldLimit(DFL_FIELD_LIMIT),
|
||||
_nameLengthLimit(DFL_NAME_LENGTH_LIMIT),
|
||||
_valueLengthLimit(DFL_VALUE_LENGTH_LIMIT)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
MessageHeader::MessageHeader(const MessageHeader& messageHeader):
|
||||
NameValueCollection(messageHeader),
|
||||
_fieldLimit(DFL_FIELD_LIMIT)
|
||||
_fieldLimit(DFL_FIELD_LIMIT),
|
||||
_nameLengthLimit(DFL_NAME_LENGTH_LIMIT),
|
||||
_valueLengthLimit(DFL_VALUE_LENGTH_LIMIT)
|
||||
{
|
||||
}
|
||||
|
||||
@ -80,12 +84,12 @@ void MessageHeader::read(std::istream& istr)
|
||||
throw MessageException("Too many header fields");
|
||||
name.clear();
|
||||
value.clear();
|
||||
while (ch != eof && ch != ':' && ch != '\n' && name.length() < MAX_NAME_LENGTH) { name += ch; ch = buf.sbumpc(); }
|
||||
while (ch != eof && ch != ':' && ch != '\n' && name.length() < _nameLengthLimit) { name += ch; ch = buf.sbumpc(); }
|
||||
if (ch == '\n') { ch = buf.sbumpc(); continue; } // ignore invalid header lines
|
||||
if (ch != ':') throw MessageException("Field name too long/no colon found");
|
||||
if (ch != eof) ch = buf.sbumpc(); // ':'
|
||||
while (ch != eof && Poco::Ascii::isSpace(ch) && ch != '\r' && ch != '\n') ch = buf.sbumpc();
|
||||
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < MAX_VALUE_LENGTH) { value += ch; ch = buf.sbumpc(); }
|
||||
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < _valueLengthLimit) { value += ch; ch = buf.sbumpc(); }
|
||||
if (ch == '\r') ch = buf.sbumpc();
|
||||
if (ch == '\n')
|
||||
ch = buf.sbumpc();
|
||||
@ -93,7 +97,7 @@ void MessageHeader::read(std::istream& istr)
|
||||
throw MessageException("Field value too long/no CRLF found");
|
||||
while (ch == ' ' || ch == '\t') // folding
|
||||
{
|
||||
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < MAX_VALUE_LENGTH) { value += ch; ch = buf.sbumpc(); }
|
||||
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < _valueLengthLimit) { value += ch; ch = buf.sbumpc(); }
|
||||
if (ch == '\r') ch = buf.sbumpc();
|
||||
if (ch == '\n')
|
||||
ch = buf.sbumpc();
|
||||
@ -122,6 +126,32 @@ void MessageHeader::setFieldLimit(int limit)
|
||||
}
|
||||
|
||||
|
||||
int MessageHeader::getNameLengthLimit() const
|
||||
{
|
||||
return _nameLengthLimit;
|
||||
}
|
||||
|
||||
void MessageHeader::setNameLengthLimit(int limit)
|
||||
{
|
||||
poco_assert(limit >= 0);
|
||||
|
||||
_nameLengthLimit = limit;
|
||||
}
|
||||
|
||||
|
||||
int MessageHeader::getValueLengthLimit() const
|
||||
{
|
||||
return _valueLengthLimit;
|
||||
}
|
||||
|
||||
void MessageHeader::setValueLengthLimit(int limit)
|
||||
{
|
||||
poco_assert(limit >= 0);
|
||||
|
||||
_valueLengthLimit = limit;
|
||||
}
|
||||
|
||||
|
||||
bool MessageHeader::hasToken(const std::string& fieldName, const std::string& token) const
|
||||
{
|
||||
std::string field = get(fieldName, "");
|
||||
|
@ -120,6 +120,12 @@ setup_aws_credentials() {
|
||||
local minio_root_user=${MINIO_ROOT_USER:-clickhouse}
|
||||
local minio_root_password=${MINIO_ROOT_PASSWORD:-clickhouse}
|
||||
mkdir -p ~/.aws
|
||||
if [[ -f ~/.aws/credentials ]]; then
|
||||
if grep -q "^\[default\]" ~/.aws/credentials; then
|
||||
echo "The credentials file contains a [default] section."
|
||||
return
|
||||
fi
|
||||
fi
|
||||
cat <<EOT >> ~/.aws/credentials
|
||||
[default]
|
||||
aws_access_key_id=${minio_root_user}
|
||||
|
@ -52,6 +52,9 @@ namespace S3RequestSetting
|
||||
{
|
||||
extern const S3RequestSettingsBool allow_native_copy;
|
||||
extern const S3RequestSettingsString storage_class_name;
|
||||
extern const S3RequestSettingsUInt64 http_max_fields;
|
||||
extern const S3RequestSettingsUInt64 http_max_field_name_size;
|
||||
extern const S3RequestSettingsUInt64 http_max_field_value_size;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -100,6 +103,9 @@ namespace
|
||||
client_configuration.requestTimeoutMs = 60 * 60 * 1000;
|
||||
client_configuration.http_keep_alive_timeout = S3::DEFAULT_KEEP_ALIVE_TIMEOUT;
|
||||
client_configuration.http_keep_alive_max_requests = S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS;
|
||||
client_configuration.http_max_fields = request_settings[S3RequestSetting::http_max_fields];
|
||||
client_configuration.http_max_field_name_size = request_settings[S3RequestSetting::http_max_field_name_size];
|
||||
client_configuration.http_max_field_value_size = request_settings[S3RequestSetting::http_max_field_value_size];
|
||||
|
||||
S3::ClientSettings client_settings{
|
||||
.use_virtual_addressing = s3_uri.is_virtual_hosted_style,
|
||||
|
@ -56,6 +56,9 @@ void CascadeWriteBuffer::nextImpl()
|
||||
|
||||
CascadeWriteBuffer::WriteBufferPtrs CascadeWriteBuffer::getResultBuffers()
|
||||
{
|
||||
if (!curr_buffer)
|
||||
return {};
|
||||
|
||||
/// Sync position with underlying buffer before invalidating
|
||||
curr_buffer->position() = position();
|
||||
|
||||
|
@ -163,6 +163,9 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
|
||||
, remote_host_filter(client_configuration.remote_host_filter)
|
||||
, s3_max_redirects(client_configuration.s3_max_redirects)
|
||||
, s3_use_adaptive_timeouts(client_configuration.s3_use_adaptive_timeouts)
|
||||
, http_max_fields(client_configuration.http_max_fields)
|
||||
, http_max_field_name_size(client_configuration.http_max_field_name_size)
|
||||
, http_max_field_value_size(client_configuration.http_max_field_value_size)
|
||||
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
|
||||
, for_disk_s3(client_configuration.for_disk_s3)
|
||||
, get_request_throttler(client_configuration.get_request_throttler)
|
||||
@ -466,6 +469,9 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
}
|
||||
|
||||
Poco::Net::HTTPResponse poco_response;
|
||||
poco_response.setFieldLimit(static_cast<int>(http_max_fields));
|
||||
poco_response.setNameLengthLimit(static_cast<int>(http_max_field_name_size));
|
||||
poco_response.setValueLengthLimit(static_cast<int>(http_max_field_value_size));
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
|
@ -57,6 +57,10 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
||||
size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT;
|
||||
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
|
||||
|
||||
UInt64 http_max_fields = 1000000;
|
||||
UInt64 http_max_field_name_size = 128 * 1024;
|
||||
UInt64 http_max_field_value_size = 128 * 1024;
|
||||
|
||||
std::function<void(const ProxyConfiguration &)> error_report;
|
||||
|
||||
void updateSchemeAndRegion();
|
||||
@ -177,6 +181,9 @@ protected:
|
||||
const RemoteHostFilter & remote_host_filter;
|
||||
unsigned int s3_max_redirects = 0;
|
||||
bool s3_use_adaptive_timeouts = true;
|
||||
const UInt64 http_max_fields = 1000000;
|
||||
const UInt64 http_max_field_name_size = 128 * 1024;
|
||||
const UInt64 http_max_field_value_size = 128 * 1024;
|
||||
bool enable_s3_requests_logging = false;
|
||||
bool for_disk_s3 = false;
|
||||
|
||||
|
@ -37,7 +37,10 @@ namespace ErrorCodes
|
||||
DECLARE(Bool, check_objects_after_upload, S3::DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD, "", 0) \
|
||||
DECLARE(Bool, throw_on_zero_files_match, false, "", 0) \
|
||||
DECLARE(UInt64, max_single_operation_copy_size, S3::DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE, "", 0) \
|
||||
DECLARE(String, storage_class_name, "", "", 0)
|
||||
DECLARE(String, storage_class_name, "", "", 0) \
|
||||
DECLARE(UInt64, http_max_fields, 1000000, "", 0) \
|
||||
DECLARE(UInt64, http_max_field_name_size, 128 * 1024, "", 0) \
|
||||
DECLARE(UInt64, http_max_field_value_size, 128 * 1024, "", 0)
|
||||
|
||||
#define PART_UPLOAD_SETTINGS(DECLARE, ALIAS) \
|
||||
DECLARE(UInt64, strict_upload_part_size, 0, "", 0) \
|
||||
|
@ -6054,7 +6054,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartRestoredFromBackup(cons
|
||||
if (!retryable || (try_no + 1 == loading_parts_max_tries))
|
||||
{
|
||||
if (Exception * e = exception_cast<Exception *>(error))
|
||||
e->addMessage("while restoring part {} of table {}", part->name, getStorageID());
|
||||
e->addMessage("while restoring part {} of table {}", part_name, getStorageID());
|
||||
std::rethrow_exception(error);
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <numeric>
|
||||
|
||||
|
||||
@ -438,6 +439,163 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper(
|
||||
"of wrong zookeeper path or because of logical error");
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
struct Info
|
||||
{
|
||||
std::string hostname;
|
||||
std::string table_id;
|
||||
|
||||
bool operator ==(const Info & other) const
|
||||
{
|
||||
return hostname == other.hostname && table_id == other.table_id;
|
||||
}
|
||||
|
||||
static Info create(const StorageID & storage_id)
|
||||
{
|
||||
Info self;
|
||||
self.hostname = DNSResolver::instance().getHostName();
|
||||
self.table_id = storage_id.hasUUID() ? toString(storage_id.uuid) : storage_id.getFullTableName();
|
||||
return self;
|
||||
}
|
||||
|
||||
std::string serialize() const
|
||||
{
|
||||
WriteBufferFromOwnString buf;
|
||||
size_t version = 0;
|
||||
buf << version << "\n";
|
||||
buf << hostname << "\n";
|
||||
buf << table_id << "\n";
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
static Info deserialize(const std::string & str)
|
||||
{
|
||||
ReadBufferFromString buf(str);
|
||||
Info info;
|
||||
size_t version;
|
||||
buf >> version >> "\n";
|
||||
buf >> info.hostname >> "\n";
|
||||
buf >> info.table_id >> "\n";
|
||||
return info;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void ObjectStorageQueueMetadata::registerIfNot(const StorageID & storage_id)
|
||||
{
|
||||
const auto registry_path = zookeeper_path / "registry";
|
||||
const auto self = Info::create(storage_id);
|
||||
|
||||
Coordination::Error code;
|
||||
for (size_t i = 0; i < 1000; ++i)
|
||||
{
|
||||
Coordination::Stat stat;
|
||||
std::string registry_str;
|
||||
auto zk_client = getZooKeeper();
|
||||
|
||||
if (zk_client->tryGet(registry_path, registry_str, &stat))
|
||||
{
|
||||
Strings registered;
|
||||
splitInto<','>(registered, registry_str);
|
||||
|
||||
for (const auto & elem : registered)
|
||||
{
|
||||
if (elem.empty())
|
||||
continue;
|
||||
|
||||
auto info = Info::deserialize(elem);
|
||||
if (info == self)
|
||||
{
|
||||
LOG_TRACE(log, "Table {} is already registered", self.table_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
auto new_registry_str = registry_str + "," + self.serialize();
|
||||
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
|
||||
}
|
||||
else
|
||||
code = zk_client->tryCreate(registry_path, self.serialize(), zkutil::CreateMode::Persistent);
|
||||
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_TRACE(log, "Added {} to registry", self.table_id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (code == Coordination::Error::ZBADVERSION
|
||||
|| code == Coordination::Error::ZSESSIONEXPIRED)
|
||||
continue;
|
||||
|
||||
throw zkutil::KeeperException(code);
|
||||
}
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot register in keeper. Last error: {}", code);
|
||||
}
|
||||
|
||||
size_t ObjectStorageQueueMetadata::unregister(const StorageID & storage_id)
|
||||
{
|
||||
const auto registry_path = zookeeper_path / "registry";
|
||||
const auto self = Info::create(storage_id);
|
||||
|
||||
Coordination::Error code = Coordination::Error::ZOK;
|
||||
for (size_t i = 0; i < 1000; ++i)
|
||||
{
|
||||
Coordination::Stat stat;
|
||||
std::string registry_str;
|
||||
auto zk_client = getZooKeeper();
|
||||
|
||||
bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat);
|
||||
if (!node_exists)
|
||||
{
|
||||
LOG_WARNING(log, "Cannot unregister: registry does not exist");
|
||||
chassert(false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
Strings registered;
|
||||
splitInto<','>(registered, registry_str);
|
||||
|
||||
bool found = false;
|
||||
std::string new_registry_str;
|
||||
size_t count = 0;
|
||||
for (const auto & elem : registered)
|
||||
{
|
||||
if (elem.empty())
|
||||
continue;
|
||||
|
||||
auto info = Info::deserialize(elem);
|
||||
if (info == self)
|
||||
found = true;
|
||||
else
|
||||
{
|
||||
if (!new_registry_str.empty())
|
||||
new_registry_str += ",";
|
||||
new_registry_str += elem;
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister: not registered");
|
||||
|
||||
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
|
||||
|
||||
if (code == Coordination::Error::ZOK)
|
||||
return count;
|
||||
|
||||
if (Coordination::isHardwareError(code)
|
||||
|| code == Coordination::Error::ZBADVERSION)
|
||||
continue;
|
||||
|
||||
throw zkutil::KeeperException(code);
|
||||
}
|
||||
|
||||
if (Coordination::isHardwareError(code))
|
||||
throw zkutil::KeeperException(code);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister in keeper. Last error: {}", code);
|
||||
}
|
||||
|
||||
void ObjectStorageQueueMetadata::cleanupThreadFunc()
|
||||
{
|
||||
/// A background task is responsible for maintaining
|
||||
|
@ -71,6 +71,9 @@ public:
|
||||
bool is_attach,
|
||||
LoggerPtr log);
|
||||
|
||||
void registerIfNot(const StorageID & storage_id);
|
||||
size_t unregister(const StorageID & storage_id);
|
||||
|
||||
void shutdown();
|
||||
|
||||
FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
|
||||
|
@ -16,7 +16,8 @@ ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance(
|
||||
|
||||
ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate(
|
||||
const std::string & zookeeper_path,
|
||||
ObjectStorageQueueMetadataPtr metadata)
|
||||
ObjectStorageQueueMetadataPtr metadata,
|
||||
const StorageID & storage_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = metadata_by_path.find(zookeeper_path);
|
||||
@ -30,13 +31,14 @@ ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFa
|
||||
auto & metadata_from_keeper = it->second.metadata->getTableMetadata();
|
||||
|
||||
metadata_from_table.checkEquals(metadata_from_keeper);
|
||||
|
||||
it->second.ref_count += 1;
|
||||
}
|
||||
|
||||
it->second.metadata->registerIfNot(storage_id);
|
||||
it->second.ref_count += 1;
|
||||
return it->second.metadata;
|
||||
}
|
||||
|
||||
void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path)
|
||||
void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path, const StorageID & storage_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = metadata_by_path.find(zookeeper_path);
|
||||
@ -44,28 +46,52 @@ void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_pat
|
||||
if (it == metadata_by_path.end())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path);
|
||||
|
||||
chassert(it->second.ref_count > 0);
|
||||
if (--it->second.ref_count == 0)
|
||||
it->second.ref_count -= 1;
|
||||
|
||||
size_t registry_size;
|
||||
try
|
||||
{
|
||||
registry_size = it->second.metadata->unregister(storage_id);
|
||||
LOG_TRACE(log, "Remaining registry size: {}", registry_size);
|
||||
}
|
||||
catch (const zkutil::KeeperException & e)
|
||||
{
|
||||
if (!Coordination::isHardwareError(e.code))
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
/// Any non-zero value would do.
|
||||
registry_size = 1;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
/// Any non-zero value would do.
|
||||
registry_size = 1;
|
||||
}
|
||||
|
||||
if (registry_size == 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto zk_client = Context::getGlobalContextInstance()->getZooKeeper();
|
||||
zk_client->tryRemove(it->first);
|
||||
zk_client->removeRecursive(it->first);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
metadata_by_path.erase(it);
|
||||
}
|
||||
|
||||
if (!it->second.ref_count)
|
||||
metadata_by_path.erase(it);
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> ObjectStorageQueueMetadataFactory::getAll()
|
||||
{
|
||||
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> result;
|
||||
for (const auto & [zk_path, metadata_and_ref_count] : metadata_by_path)
|
||||
result.emplace(zk_path, metadata_and_ref_count.metadata);
|
||||
for (const auto & [zk_path, metadata] : metadata_by_path)
|
||||
result.emplace(zk_path, metadata.metadata);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -14,25 +14,25 @@ public:
|
||||
|
||||
FilesMetadataPtr getOrCreate(
|
||||
const std::string & zookeeper_path,
|
||||
ObjectStorageQueueMetadataPtr metadata);
|
||||
ObjectStorageQueueMetadataPtr metadata,
|
||||
const StorageID & storage_id);
|
||||
|
||||
void remove(const std::string & zookeeper_path);
|
||||
void remove(const std::string & zookeeper_path, const StorageID & storage_id);
|
||||
|
||||
std::unordered_map<std::string, FilesMetadataPtr> getAll();
|
||||
|
||||
private:
|
||||
struct Metadata
|
||||
struct MetadataWithRefCount
|
||||
{
|
||||
explicit Metadata(std::shared_ptr<ObjectStorageQueueMetadata> metadata_) : metadata(metadata_), ref_count(1) {}
|
||||
|
||||
explicit MetadataWithRefCount(std::shared_ptr<ObjectStorageQueueMetadata> metadata_) : metadata(metadata_) {}
|
||||
std::shared_ptr<ObjectStorageQueueMetadata> metadata;
|
||||
/// TODO: the ref count should be kept in keeper, because of the case with distributed processing.
|
||||
size_t ref_count = 0;
|
||||
};
|
||||
using MetadataByPath = std::unordered_map<std::string, Metadata>;
|
||||
using MetadataByPath = std::unordered_map<std::string, MetadataWithRefCount>;
|
||||
|
||||
MetadataByPath metadata_by_path;
|
||||
std::mutex mutex;
|
||||
LoggerPtr log = getLogger("QueueMetadataFactory");
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -214,9 +214,12 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
|
||||
zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log);
|
||||
|
||||
auto queue_metadata = std::make_unique<ObjectStorageQueueMetadata>(
|
||||
zk_path, std::move(table_metadata), (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]);
|
||||
zk_path,
|
||||
std::move(table_metadata),
|
||||
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms],
|
||||
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]);
|
||||
|
||||
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata));
|
||||
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata), table_id_);
|
||||
|
||||
task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
|
||||
}
|
||||
@ -248,7 +251,7 @@ void StorageObjectStorageQueue::shutdown(bool is_drop)
|
||||
|
||||
void StorageObjectStorageQueue::drop()
|
||||
{
|
||||
ObjectStorageQueueMetadataFactory::instance().remove(zk_path);
|
||||
ObjectStorageQueueMetadataFactory::instance().remove(zk_path, getStorageID());
|
||||
}
|
||||
|
||||
bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const
|
||||
|
@ -2403,3 +2403,127 @@ def test_list_and_delete_race(started_cluster):
|
||||
assert node.contains_in_log(
|
||||
"because of the race with list & delete"
|
||||
) or node_2.contains_in_log("because of the race with list & delete")
|
||||
|
||||
|
||||
def test_registry(started_cluster):
|
||||
node1 = started_cluster.instances["node1"]
|
||||
node2 = started_cluster.instances["node2"]
|
||||
|
||||
table_name = f"test_registry_{uuid.uuid4().hex[:8]}"
|
||||
db_name = f"db_{table_name}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 1000
|
||||
|
||||
node1.query(f"DROP DATABASE IF EXISTS {db_name}")
|
||||
node2.query(f"DROP DATABASE IF EXISTS {db_name}")
|
||||
|
||||
node1.query(
|
||||
f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb2', 'shard1', 'node1')"
|
||||
)
|
||||
node2.query(
|
||||
f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb2', 'shard1', 'node2')"
|
||||
)
|
||||
|
||||
create_table(
|
||||
started_cluster,
|
||||
node1,
|
||||
table_name,
|
||||
"ordered",
|
||||
files_path,
|
||||
additional_settings={"keeper_path": keeper_path, "buckets": 3},
|
||||
database_name=db_name,
|
||||
)
|
||||
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||
|
||||
uuid1 = node1.query(
|
||||
f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name}'"
|
||||
).strip()
|
||||
assert uuid1 in str(registry)
|
||||
|
||||
expected = [f"0\\nnode1\\n{uuid1}\\n", f"0\\nnode2\\n{uuid1}\\n"]
|
||||
|
||||
for elem in expected:
|
||||
assert elem in str(registry)
|
||||
|
||||
total_values = generate_random_files(
|
||||
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
||||
)
|
||||
|
||||
create_mv(node1, f"{db_name}.{table_name}", dst_table_name)
|
||||
create_mv(node2, f"{db_name}.{table_name}", dst_table_name)
|
||||
|
||||
def get_count():
|
||||
return int(
|
||||
node1.query(
|
||||
f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})"
|
||||
)
|
||||
)
|
||||
|
||||
expected_rows = files_to_generate
|
||||
for _ in range(20):
|
||||
if expected_rows == get_count():
|
||||
break
|
||||
time.sleep(1)
|
||||
assert expected_rows == get_count()
|
||||
|
||||
table_name_2 = f"test_registry_{uuid.uuid4().hex[:8]}_2"
|
||||
create_table(
|
||||
started_cluster,
|
||||
node1,
|
||||
table_name_2,
|
||||
"ordered",
|
||||
files_path,
|
||||
additional_settings={"keeper_path": keeper_path, "buckets": 3},
|
||||
database_name=db_name,
|
||||
)
|
||||
|
||||
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||
|
||||
uuid2 = node1.query(
|
||||
f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name_2}'"
|
||||
).strip()
|
||||
|
||||
assert uuid1 in str(registry)
|
||||
assert uuid2 in str(registry)
|
||||
|
||||
expected = [
|
||||
f"0\\nnode1\\n{uuid1}\\n",
|
||||
f"0\\nnode2\\n{uuid1}\\n",
|
||||
f"0\\nnode1\\n{uuid2}\\n",
|
||||
f"0\\nnode2\\n{uuid2}\\n",
|
||||
]
|
||||
|
||||
for elem in expected:
|
||||
assert elem in str(registry)
|
||||
|
||||
node1.restart_clickhouse()
|
||||
node2.restart_clickhouse()
|
||||
|
||||
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||
|
||||
assert uuid1 in str(registry)
|
||||
assert uuid2 in str(registry)
|
||||
|
||||
node1.query(f"DROP TABLE {db_name}.{table_name_2} SYNC")
|
||||
|
||||
assert zk.exists(keeper_path) is not None
|
||||
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||
|
||||
assert uuid1 in str(registry)
|
||||
assert uuid2 not in str(registry)
|
||||
|
||||
expected = [
|
||||
f"0\\nnode1\\n{uuid1}\\n",
|
||||
f"0\\nnode2\\n{uuid1}\\n",
|
||||
]
|
||||
|
||||
for elem in expected:
|
||||
assert elem in str(registry)
|
||||
|
||||
node1.query(f"DROP TABLE {db_name}.{table_name} SYNC")
|
||||
|
||||
assert zk.exists(keeper_path) is None
|
||||
|
Loading…
Reference in New Issue
Block a user