Merge branch 'master' into change-log-level-clickhouse-local

This commit is contained in:
Alexey Milovidov 2024-08-07 22:13:02 +02:00
commit c5b63c5820
60 changed files with 791 additions and 371 deletions

View File

@ -21,6 +21,7 @@
#include "Poco/Exception.h"
#include "Poco/Foundation.h"
#include "Poco/Mutex.h"
#include "Poco/Message.h"
namespace Poco
@ -78,6 +79,10 @@ public:
///
/// The default implementation just breaks into the debugger.
virtual void logMessageImpl(Message::Priority priority, const std::string & msg) {}
/// Write a messages to the log
/// Useful for logging from Poco
static void handle(const Exception & exc);
/// Invokes the currently registered ErrorHandler.
@ -87,6 +92,9 @@ public:
static void handle();
/// Invokes the currently registered ErrorHandler.
static void logMessage(Message::Priority priority, const std::string & msg);
/// Invokes the currently registered ErrorHandler to log a message.
static ErrorHandler * set(ErrorHandler * pHandler);
/// Registers the given handler as the current error handler.
///

View File

@ -8,7 +8,7 @@
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
// SPDX-License-Identifier: BSL-1.0
//
@ -35,79 +35,91 @@ ErrorHandler::~ErrorHandler()
void ErrorHandler::exception(const Exception& exc)
{
poco_debugger_msg(exc.what());
poco_debugger_msg(exc.what());
}
void ErrorHandler::exception(const std::exception& exc)
{
poco_debugger_msg(exc.what());
poco_debugger_msg(exc.what());
}
void ErrorHandler::exception()
{
poco_debugger_msg("unknown exception");
poco_debugger_msg("unknown exception");
}
void ErrorHandler::handle(const Exception& exc)
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
}
void ErrorHandler::handle(const std::exception& exc)
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
}
void ErrorHandler::handle()
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception();
}
catch (...)
{
}
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception();
}
catch (...)
{
}
}
void ErrorHandler::logMessage(Message::Priority priority, const std::string & msg)
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->logMessageImpl(priority, msg);
}
catch (...)
{
}
}
ErrorHandler* ErrorHandler::set(ErrorHandler* pHandler)
{
poco_check_ptr(pHandler);
poco_check_ptr(pHandler);
FastMutex::ScopedLock lock(_mutex);
ErrorHandler* pOld = _pHandler;
_pHandler = pHandler;
return pOld;
FastMutex::ScopedLock lock(_mutex);
ErrorHandler* pOld = _pHandler;
_pHandler = pHandler;
return pOld;
}
ErrorHandler* ErrorHandler::defaultHandler()
{
// NOTE: Since this is called to initialize the static _pHandler
// variable, sh has to be a local static, otherwise we run
// into static initialization order issues.
static SingletonHolder<ErrorHandler> sh;
return sh.get();
// NOTE: Since this is called to initialize the static _pHandler
// variable, sh has to be a local static, otherwise we run
// into static initialization order issues.
static SingletonHolder<ErrorHandler> sh;
return sh.get();
}

View File

@ -17,6 +17,7 @@
#include "Poco/Net/StreamSocketImpl.h"
#include "Poco/NumberFormatter.h"
#include "Poco/Timestamp.h"
#include "Poco/ErrorHandler.h"
#include <string.h> // FD_SET needs memset on some platforms, so we can't use <cstring>

View File

@ -8,7 +8,7 @@
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
// SPDX-License-Identifier: BSL-1.0
//
@ -44,190 +44,194 @@ TCPServerConnectionFilter::~TCPServerConnectionFilter()
TCPServer::TCPServer(TCPServerConnectionFactory::Ptr pFactory, Poco::UInt16 portNumber, TCPServerParams::Ptr pParams):
_socket(ServerSocket(portNumber)),
_thread(threadName(_socket)),
_stopped(true)
{
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
_socket(ServerSocket(portNumber)),
_thread(threadName(_socket)),
_stopped(true)
{
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
}
TCPServer::TCPServer(TCPServerConnectionFactory::Ptr pFactory, const ServerSocket& socket, TCPServerParams::Ptr pParams):
_socket(socket),
_thread(threadName(socket)),
_stopped(true)
_socket(socket),
_thread(threadName(socket)),
_stopped(true)
{
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
}
TCPServer::TCPServer(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, const ServerSocket& socket, TCPServerParams::Ptr pParams):
_socket(socket),
_pDispatcher(new TCPServerDispatcher(pFactory, threadPool, pParams)),
_thread(threadName(socket)),
_stopped(true)
_socket(socket),
_pDispatcher(new TCPServerDispatcher(pFactory, threadPool, pParams)),
_thread(threadName(socket)),
_stopped(true)
{
}
TCPServer::~TCPServer()
{
try
{
stop();
_pDispatcher->release();
}
catch (...)
{
poco_unexpected();
}
try
{
stop();
_pDispatcher->release();
}
catch (...)
{
poco_unexpected();
}
}
const TCPServerParams& TCPServer::params() const
{
return _pDispatcher->params();
return _pDispatcher->params();
}
void TCPServer::start()
{
poco_assert (_stopped);
poco_assert (_stopped);
_stopped = false;
_thread.start(*this);
_stopped = false;
_thread.start(*this);
}
void TCPServer::stop()
{
if (!_stopped)
{
_stopped = true;
_thread.join();
_pDispatcher->stop();
}
if (!_stopped)
{
_stopped = true;
_thread.join();
_pDispatcher->stop();
}
}
void TCPServer::run()
{
while (!_stopped)
{
Poco::Timespan timeout(250000);
try
{
if (_socket.poll(timeout, Socket::SELECT_READ))
{
try
{
StreamSocket ss = _socket.acceptConnection();
if (!_pConnectionFilter || _pConnectionFilter->accept(ss))
{
// enable nodelay per default: OSX really needs that
while (!_stopped)
{
Poco::Timespan timeout(250000);
try
{
if (_socket.poll(timeout, Socket::SELECT_READ))
{
try
{
StreamSocket ss = _socket.acceptConnection();
if (!_pConnectionFilter || _pConnectionFilter->accept(ss))
{
// enable nodelay per default: OSX really needs that
#if defined(POCO_OS_FAMILY_UNIX)
if (ss.address().family() != AddressFamily::UNIX_LOCAL)
if (ss.address().family() != AddressFamily::UNIX_LOCAL)
#endif
{
ss.setNoDelay(true);
}
_pDispatcher->enqueue(ss);
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
// possibly a resource issue since poll() failed;
// give some time to recover before trying again
Poco::Thread::sleep(50);
}
}
{
ss.setNoDelay(true);
}
_pDispatcher->enqueue(ss);
}
else
{
ErrorHandler::logMessage(Message::PRIO_WARNING, "Filtered out connection from " + ss.peerAddress().toString());
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
// possibly a resource issue since poll() failed;
// give some time to recover before trying again
Poco::Thread::sleep(50);
}
}
}
int TCPServer::currentThreads() const
{
return _pDispatcher->currentThreads();
return _pDispatcher->currentThreads();
}
int TCPServer::maxThreads() const
{
return _pDispatcher->maxThreads();
return _pDispatcher->maxThreads();
}
int TCPServer::totalConnections() const
{
return _pDispatcher->totalConnections();
return _pDispatcher->totalConnections();
}
int TCPServer::currentConnections() const
{
return _pDispatcher->currentConnections();
return _pDispatcher->currentConnections();
}
int TCPServer::maxConcurrentConnections() const
{
return _pDispatcher->maxConcurrentConnections();
return _pDispatcher->maxConcurrentConnections();
}
int TCPServer::queuedConnections() const
{
return _pDispatcher->queuedConnections();
return _pDispatcher->queuedConnections();
}
int TCPServer::refusedConnections() const
{
return _pDispatcher->refusedConnections();
return _pDispatcher->refusedConnections();
}
void TCPServer::setConnectionFilter(const TCPServerConnectionFilter::Ptr& pConnectionFilter)
{
poco_assert (_stopped);
poco_assert (_stopped);
_pConnectionFilter = pConnectionFilter;
_pConnectionFilter = pConnectionFilter;
}
std::string TCPServer::threadName(const ServerSocket& socket)
{
std::string name("TCPServer: ");
name.append(socket.address().toString());
return name;
std::string name("TCPServer: ");
name.append(socket.address().toString());
return name;
}

View File

@ -8,7 +8,7 @@
// Copyright (c) 2005-2007, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
// SPDX-License-Identifier: BSL-1.0
//
@ -33,44 +33,44 @@ namespace Net {
class TCPConnectionNotification: public Notification
{
public:
TCPConnectionNotification(const StreamSocket& socket):
_socket(socket)
{
}
~TCPConnectionNotification()
{
}
const StreamSocket& socket() const
{
return _socket;
}
TCPConnectionNotification(const StreamSocket& socket):
_socket(socket)
{
}
~TCPConnectionNotification()
{
}
const StreamSocket& socket() const
{
return _socket;
}
private:
StreamSocket _socket;
StreamSocket _socket;
};
TCPServerDispatcher::TCPServerDispatcher(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, TCPServerParams::Ptr pParams):
_rc(1),
_pParams(pParams),
_currentThreads(0),
_totalConnections(0),
_currentConnections(0),
_maxConcurrentConnections(0),
_refusedConnections(0),
_stopped(false),
_pConnectionFactory(pFactory),
_threadPool(threadPool)
_rc(1),
_pParams(pParams),
_currentThreads(0),
_totalConnections(0),
_currentConnections(0),
_maxConcurrentConnections(0),
_refusedConnections(0),
_stopped(false),
_pConnectionFactory(pFactory),
_threadPool(threadPool)
{
poco_check_ptr (pFactory);
poco_check_ptr (pFactory);
if (!_pParams)
_pParams = new TCPServerParams;
if (_pParams->getMaxThreads() == 0)
_pParams->setMaxThreads(threadPool.capacity());
if (!_pParams)
_pParams = new TCPServerParams;
if (_pParams->getMaxThreads() == 0)
_pParams->setMaxThreads(threadPool.capacity());
}
@ -81,161 +81,184 @@ TCPServerDispatcher::~TCPServerDispatcher()
void TCPServerDispatcher::duplicate()
{
++_rc;
++_rc;
}
void TCPServerDispatcher::release()
{
if (--_rc == 0) delete this;
if (--_rc == 0) delete this;
}
void TCPServerDispatcher::run()
{
AutoPtr<TCPServerDispatcher> guard(this); // ensure object stays alive
AutoPtr<TCPServerDispatcher> guard(this); // ensure object stays alive
int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
for (;;)
{
try
{
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf && !_stopped)
{
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
{
beginConnection();
if (!_stopped)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
pConnection->start();
}
/// endConnection() should be called after destroying TCPServerConnection,
/// otherwise currentConnections() could become zero while some connections are yet still alive.
endConnection();
}
}
}
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
FastMutex::ScopedLock lock(_mutex);
if (_stopped || (_currentThreads > 1 && _queue.empty()))
{
--_currentThreads;
break;
}
}
for (;;)
{
try
{
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf && !_stopped)
{
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
{
beginConnection();
if (!_stopped)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
pConnection->start();
}
/// endConnection() should be called after destroying TCPServerConnection,
/// otherwise currentConnections() could become zero while some connections are yet still alive.
endConnection();
}
}
}
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
FastMutex::ScopedLock lock(_mutex);
if (_stopped || (_currentThreads > 1 && _queue.empty()))
{
--_currentThreads;
break;
}
}
}
namespace
{
static const std::string threadName("TCPServerConnection");
static const std::string threadName("TCPServerConnection");
}
void TCPServerDispatcher::enqueue(const StreamSocket& socket)
{
FastMutex::ScopedLock lock(_mutex);
FastMutex::ScopedLock lock(_mutex);
if (_queue.size() < _pParams->getMaxQueued())
{
if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
{
try
{
ErrorHandler::logMessage(Message::PRIO_TEST, "Queue size: " + std::to_string(_queue.size()) +
", current threads: " + std::to_string(_currentThreads) +
", threads in pool: " + std::to_string(_threadPool.allocated()) +
", current connections: " + std::to_string(_currentConnections));
if (_queue.size() < _pParams->getMaxQueued())
{
/// NOTE: the condition below is wrong.
/// Since the thread pool is shared between multiple servers/TCPServerDispatchers,
/// _currentThreads < _pParams->getMaxThreads() will be true when the pool is actually saturated.
/// As a result, queue is useless and connections never wait in queue.
/// Instead, we (mistakenly) think that we can create a thread for this connection, but we fail to create it
/// and the connection get rejected.
/// We could check _currentThreads < _threadPool.allocated() to make it work,
/// but it's not clear if we want to make it work
/// because it may be better to reject connection immediately if we don't have resources to handle it.
if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
{
try
{
this->duplicate();
_threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
++_currentThreads;
}
catch (Poco::Exception& exc)
{
_threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
++_currentThreads;
}
catch (Poco::Exception& exc)
{
ErrorHandler::logMessage(Message::PRIO_WARNING, "Got an exception while starting thread for connection from " +
socket.peerAddress().toString());
ErrorHandler::handle(exc);
this->release();
++_refusedConnections;
std::cerr << "Got exception while starting thread for connection. Error code: "
<< exc.code() << ", message: '" << exc.displayText() << "'" << std::endl;
return;
}
}
_queue.enqueueNotification(new TCPConnectionNotification(socket));
}
else
{
++_refusedConnections;
}
++_refusedConnections;
return;
}
}
else if (!_queue.hasIdleThreads())
{
ErrorHandler::logMessage(Message::PRIO_TRACE, "Don't have idle threads, adding connection from " +
socket.peerAddress().toString() + " to the queue, size: " + std::to_string(_queue.size()));
}
_queue.enqueueNotification(new TCPConnectionNotification(socket));
}
else
{
ErrorHandler::logMessage(Message::PRIO_WARNING, "Refusing connection from " + socket.peerAddress().toString() +
", reached max queue size " + std::to_string(_pParams->getMaxQueued()));
++_refusedConnections;
}
}
void TCPServerDispatcher::stop()
{
_stopped = true;
_queue.clear();
_queue.wakeUpAll();
_stopped = true;
_queue.clear();
_queue.wakeUpAll();
}
int TCPServerDispatcher::currentThreads() const
{
return _currentThreads;
return _currentThreads;
}
int TCPServerDispatcher::maxThreads() const
{
FastMutex::ScopedLock lock(_mutex);
return _threadPool.capacity();
FastMutex::ScopedLock lock(_mutex);
return _threadPool.capacity();
}
int TCPServerDispatcher::totalConnections() const
{
return _totalConnections;
return _totalConnections;
}
int TCPServerDispatcher::currentConnections() const
{
return _currentConnections;
return _currentConnections;
}
int TCPServerDispatcher::maxConcurrentConnections() const
{
return _maxConcurrentConnections;
return _maxConcurrentConnections;
}
int TCPServerDispatcher::queuedConnections() const
{
return _queue.size();
return _queue.size();
}
int TCPServerDispatcher::refusedConnections() const
{
return _refusedConnections;
return _refusedConnections;
}
void TCPServerDispatcher::beginConnection()
{
FastMutex::ScopedLock lock(_mutex);
FastMutex::ScopedLock lock(_mutex);
++_totalConnections;
++_currentConnections;
if (_currentConnections > _maxConcurrentConnections)
_maxConcurrentConnections.store(_currentConnections);
++_totalConnections;
++_currentConnections;
if (_currentConnections > _maxConcurrentConnections)
_maxConcurrentConnections.store(_currentConnections);
}
void TCPServerDispatcher::endConnection()
{
--_currentConnections;
--_currentConnections;
}

View File

@ -146,6 +146,7 @@ Code: 48. DB::Exception: Received from localhost:9000. DB::Exception: Reading fr
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — ETag of the file. Type: `LowCardinalty(String)`. If the etag is unknown, the value is `NULL`.
For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).

View File

@ -379,7 +379,7 @@ You can mitigate this problem by enabling `wait_end_of_query=1` ([Response Buffe
However, this does not completely solve the problem because the result must still fit within the `http_response_buffer_size`, and other settings like `send_progress_in_http_headers` can interfere with the delay of the header.
The only way to catch all errors is to analyze the HTTP body before parsing it using the required format.
### Queries with Parameters {#cli-queries-with-parameters}
## Queries with Parameters {#cli-queries-with-parameters}
You can create a query with parameters and pass values for them from the corresponding HTTP request parameters. For more information, see [Queries with Parameters for CLI](../interfaces/cli.md#cli-queries-with-parameters).

View File

@ -241,12 +241,12 @@ CREATE OR REPLACE TABLE test
(
id UInt64,
size_bytes Int64,
size String Alias formatReadableSize(size_bytes)
size String ALIAS formatReadableSize(size_bytes)
)
ENGINE = MergeTree
ORDER BY id;
INSERT INTO test Values (1, 4678899);
INSERT INTO test VALUES (1, 4678899);
SELECT id, size_bytes, size FROM test;
┌─id─┬─size_bytes─┬─size─────┐
@ -497,7 +497,7 @@ If you perform a SELECT query mentioning a specific value in an encrypted column
```sql
CREATE TABLE mytable
(
x String Codec(AES_128_GCM_SIV)
x String CODEC(AES_128_GCM_SIV)
)
ENGINE = MergeTree ORDER BY x;
```

View File

@ -421,7 +421,7 @@ try
std::lock_guard lock(servers_lock);
metrics.reserve(servers->size());
for (const auto & server : *servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
return metrics;
}
);

View File

@ -14,6 +14,7 @@
#include <Databases/registerDatabases.h>
#include <Databases/DatabaseFilesystem.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabasesOverlay.h>
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachInformationSchemaTables.h>
@ -50,7 +51,6 @@
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Formats/registerFormats.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/program_options/options_description.hpp>
#include <base/argsToConfig.h>
#include <filesystem>
@ -216,12 +216,12 @@ static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const Str
return system_database;
}
static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context_)
static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context)
{
auto databaseCombiner = std::make_shared<DatabasesOverlay>(name_, context_);
databaseCombiner->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context_));
databaseCombiner->registerNextDatabase(std::make_shared<DatabaseMemory>(name_, context_));
return databaseCombiner;
auto overlay = std::make_shared<DatabasesOverlay>(name_, context);
overlay->registerNextDatabase(std::make_shared<DatabaseAtomic>(name_, fs::weakly_canonical(context->getPath()), UUIDHelpers::generateV4(), context));
overlay->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context));
return overlay;
}
/// If path is specified and not empty, will try to setup server environment and load existing metadata
@ -367,7 +367,7 @@ std::string LocalServer::getInitialCreateTableQuery()
else
table_structure = "(" + table_structure + ")";
return fmt::format("CREATE TABLE {} {} ENGINE = File({}, {});",
return fmt::format("CREATE TEMPORARY TABLE {} {} ENGINE = File({}, {});",
table_name, table_structure, data_format, table_file);
}
@ -761,7 +761,12 @@ void LocalServer::processConfig()
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
std::string default_database = server_settings.default_database;
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
{
DatabasePtr database = createClickHouseLocalDatabaseOverlay(default_database, global_context);
if (UUID uuid = database->getUUID(); uuid != UUIDHelpers::Nil)
DatabaseCatalog::instance().addUUIDMapping(uuid);
DatabaseCatalog::instance().attachDatabase(default_database, database);
}
global_context->setCurrentDatabase(default_database);
if (getClientConfiguration().has("path"))

View File

@ -918,10 +918,10 @@ try
metrics.reserve(servers_to_start_before_tables.size() + servers.size());
for (const auto & server : servers_to_start_before_tables)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
for (const auto & server : servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
return metrics;
}
);

View File

@ -1613,7 +1613,7 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
#endif
{
auto get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
auto threads_get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
{
static std::map<String, std::pair<const char *, const char *>> metric_map =
{
@ -1637,11 +1637,38 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
return it->second;
};
auto rejected_connections_get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
{
static std::map<String, std::pair<const char *, const char *>> metric_map =
{
{"tcp_port", {"TCPRejectedConnections", "Number of rejected connections for the TCP protocol (without TLS)."}},
{"tcp_port_secure", {"TCPSecureRejectedConnections", "Number of rejected connections for the TCP protocol (with TLS)."}},
{"http_port", {"HTTPRejectedConnections", "Number of rejected connections for the HTTP interface (without TLS)."}},
{"https_port", {"HTTPSecureRejectedConnections", "Number of rejected connections for the HTTPS interface."}},
{"interserver_http_port", {"InterserverRejectedConnections", "Number of rejected connections for the replicas communication protocol (without TLS)."}},
{"interserver_https_port", {"InterserverSecureRejectedConnections", "Number of rejected connections for the replicas communication protocol (with TLS)."}},
{"mysql_port", {"MySQLRejectedConnections", "Number of rejected connections for the MySQL compatibility protocol."}},
{"postgresql_port", {"PostgreSQLRejectedConnections", "Number of rejected connections for the PostgreSQL compatibility protocol."}},
{"grpc_port", {"GRPCRejectedConnections", "Number of rejected connections for the GRPC protocol."}},
{"prometheus.port", {"PrometheusRejectedConnections", "Number of rejected connections for the Prometheus endpoint. Note: prometheus endpoints can be also used via the usual HTTP/HTTPs ports."}},
{"keeper_server.tcp_port", {"KeeperTCPRejectedConnections", "Number of rejected connections for the Keeper TCP protocol (without TLS)."}},
{"keeper_server.tcp_port_secure", {"KeeperTCPSecureRejectedConnections", "Number of rejected connections for the Keeper TCP protocol (with TLS)."}}
};
auto it = metric_map.find(name);
if (it == metric_map.end())
return { nullptr, nullptr };
else
return it->second;
};
const auto server_metrics = protocol_server_metrics_func();
for (const auto & server_metric : server_metrics)
{
if (auto name_doc = get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
if (auto name_doc = threads_get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
new_values[name_doc.first] = { server_metric.current_threads, name_doc.second };
if (auto name_doc = rejected_connections_get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
new_values[name_doc.first] = { server_metric.rejected_connections, name_doc.second };
}
}

View File

@ -42,6 +42,7 @@ struct ProtocolServerMetrics
{
String port_name;
size_t current_threads;
size_t rejected_connections;
};
/** Periodically (by default, each second)

View File

@ -2,6 +2,7 @@
#include <Poco/ErrorHandler.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
/** ErrorHandler for Poco::Thread,
@ -26,8 +27,32 @@ public:
void exception(const std::exception &) override { logException(); }
void exception() override { logException(); }
void logMessageImpl(Poco::Message::Priority priority, const std::string & msg) override
{
switch (priority)
{
case Poco::Message::PRIO_FATAL: [[fallthrough]];
case Poco::Message::PRIO_CRITICAL:
LOG_FATAL(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_ERROR:
LOG_ERROR(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_WARNING:
LOG_WARNING(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_NOTICE: [[fallthrough]];
case Poco::Message::PRIO_INFORMATION:
LOG_INFO(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_DEBUG:
LOG_DEBUG(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_TRACE:
LOG_TRACE(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_TEST:
LOG_TEST(trace_log, fmt::runtime(msg)); break;
}
}
private:
LoggerPtr log = getLogger("ServerErrorHandler");
LoggerPtr trace_log = getLogger("Poco");
void logException()
{

View File

@ -53,9 +53,6 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, c
, db_uuid(uuid)
{
assert(db_uuid != UUIDHelpers::Nil);
fs::create_directories(fs::path(getContext()->getPath()) / "metadata");
fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink();
}
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, ContextPtr context_)
@ -63,6 +60,16 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, C
{
}
void DatabaseAtomic::createDirectories()
{
if (database_atomic_directories_created.test_and_set())
return;
DatabaseOnDisk::createDirectories();
fs::create_directories(fs::path(getContext()->getPath()) / "metadata");
fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink();
}
String DatabaseAtomic::getTableDataPath(const String & table_name) const
{
std::lock_guard lock(mutex);
@ -99,6 +106,7 @@ void DatabaseAtomic::drop(ContextPtr)
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
{
assert(relative_table_path != data_path && !relative_table_path.empty());
createDirectories();
DetachedTables not_in_use;
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
@ -200,11 +208,15 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
if (exchange && !supportsAtomicRename())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RENAME EXCHANGE is not supported");
createDirectories();
waitDatabaseStarted();
auto & other_db = dynamic_cast<DatabaseAtomic &>(to_database);
bool inside_database = this == &other_db;
if (!inside_database)
other_db.createDirectories();
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
@ -325,6 +337,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
const String & table_metadata_tmp_path, const String & table_metadata_path,
ContextPtr query_context)
{
createDirectories();
DetachedTables not_in_use;
auto table_data_path = getTableDataPath(query);
try
@ -461,6 +474,9 @@ void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, Loadin
if (mode < LoadingStrictnessLevel::FORCE_RESTORE)
return;
if (!fs::exists(path_to_table_symlinks))
return;
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
{
@ -588,6 +604,7 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
{
/// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard
createDirectories();
waitDatabaseStarted();
bool check_ref_deps = query_context->getSettingsRef().check_referential_table_dependencies;
@ -679,4 +696,5 @@ void registerDatabaseAtomic(DatabaseFactory & factory)
};
factory.registerDatabase("Atomic", create_fn);
}
}

View File

@ -76,6 +76,9 @@ protected:
using DetachedTables = std::unordered_map<UUID, StoragePtr>;
[[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex);
std::atomic_flag database_atomic_directories_created = ATOMIC_FLAG_INIT;
void createDirectories();
void tryCreateMetadataSymlink();
virtual bool allowMoveTableToOtherDatabaseEngine(IDatabase & /*to_database*/) const { return false; }

View File

@ -47,12 +47,13 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
: DatabaseOnDisk(name_, metadata_path_, std::filesystem::path("data") / escapeForFileName(name_) / "", "DatabaseLazy (" + name_ + ")", context_)
, expiration_time(expiration_time_)
{
createDirectories();
}
void DatabaseLazy::loadStoredObjects(ContextMutablePtr local_context, LoadingStrictnessLevel /*mode*/)
{
iterateMetadataFiles(local_context, [this, &local_context](const String & file_name)
iterateMetadataFiles([this, &local_context](const String & file_name)
{
const std::string table_name = unescapeForFileName(file_name.substr(0, file_name.size() - 4));

View File

@ -12,7 +12,7 @@ class DatabaseLazyIterator;
class Context;
/** Lazy engine of databases.
* Works like DatabaseOrdinary, but stores in memory only the cache.
* Works like DatabaseOrdinary, but stores only recently accessed tables in memory.
* Can be used only with *Log engines.
*/
class DatabaseLazy final : public DatabaseOnDisk

View File

@ -172,7 +172,14 @@ DatabaseOnDisk::DatabaseOnDisk(
, metadata_path(metadata_path_)
, data_path(data_path_)
{
fs::create_directories(local_context->getPath() + data_path);
}
void DatabaseOnDisk::createDirectories()
{
if (directories_created.test_and_set())
return;
fs::create_directories(std::filesystem::path(getContext()->getPath()) / data_path);
fs::create_directories(metadata_path);
}
@ -190,6 +197,8 @@ void DatabaseOnDisk::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
createDirectories();
const auto & settings = local_context->getSettingsRef();
const auto & create = query->as<ASTCreateQuery &>();
assert(table_name == create.getTable());
@ -257,7 +266,6 @@ void DatabaseOnDisk::createTable(
}
commitCreateTable(create, table, table_metadata_tmp_path, table_metadata_path, local_context);
removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, false);
}
@ -285,6 +293,8 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora
{
try
{
createDirectories();
/// Add a table to the map of known tables.
attachTable(query_context, query.getTable(), table, getTableDataPath(query));
@ -420,6 +430,7 @@ void DatabaseOnDisk::renameTable(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases of different engines is not supported");
}
createDirectories();
waitDatabaseStarted();
auto table_data_relative_path = getTableDataPath(table_name);
@ -568,14 +579,14 @@ void DatabaseOnDisk::drop(ContextPtr local_context)
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
if (local_context->getSettingsRef().force_remove_data_recursively_on_drop)
{
(void)fs::remove_all(local_context->getPath() + getDataPath());
(void)fs::remove_all(std::filesystem::path(getContext()->getPath()) / data_path);
(void)fs::remove_all(getMetadataPath());
}
else
{
try
{
(void)fs::remove(local_context->getPath() + getDataPath());
(void)fs::remove(std::filesystem::path(getContext()->getPath()) / data_path);
(void)fs::remove(getMetadataPath());
}
catch (const fs::filesystem_error & e)
@ -613,15 +624,18 @@ time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & object_n
}
}
void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const IteratingFunction & process_metadata_file) const
void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_metadata_file) const
{
if (!fs::exists(metadata_path))
return;
auto process_tmp_drop_metadata_file = [&](const String & file_name)
{
assert(getUUID() == UUIDHelpers::Nil);
static const char * tmp_drop_ext = ".sql.tmp_drop";
const std::string object_name = file_name.substr(0, file_name.size() - strlen(tmp_drop_ext));
if (fs::exists(local_context->getPath() + getDataPath() + '/' + object_name))
if (fs::exists(std::filesystem::path(getContext()->getPath()) / data_path / object_name))
{
fs::rename(getMetadataPath() + file_name, getMetadataPath() + object_name + ".sql");
LOG_WARNING(log, "Object {} was not dropped previously and will be restored", backQuote(object_name));
@ -638,7 +652,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
std::vector<std::pair<String, bool>> metadata_files;
fs::directory_iterator dir_end;
for (fs::directory_iterator dir_it(getMetadataPath()); dir_it != dir_end; ++dir_it)
for (fs::directory_iterator dir_it(metadata_path); dir_it != dir_end; ++dir_it)
{
String file_name = dir_it->path().filename();
/// For '.svn', '.gitignore' directory and similar.

View File

@ -64,7 +64,7 @@ public:
time_t getObjectMetadataModificationTime(const String & object_name) const override;
String getDataPath() const override { return data_path; }
String getTableDataPath(const String & table_name) const override { return data_path + escapeForFileName(table_name) + "/"; }
String getTableDataPath(const String & table_name) const override { return std::filesystem::path(data_path) / escapeForFileName(table_name) / ""; }
String getTableDataPath(const ASTCreateQuery & query) const override { return getTableDataPath(query.getTable()); }
String getMetadataPath() const override { return metadata_path; }
@ -83,7 +83,7 @@ protected:
using IteratingFunction = std::function<void(const String &)>;
void iterateMetadataFiles(ContextPtr context, const IteratingFunction & process_metadata_file) const;
void iterateMetadataFiles(const IteratingFunction & process_metadata_file) const;
ASTPtr getCreateTableQueryImpl(
const String & table_name,
@ -99,6 +99,9 @@ protected:
virtual void removeDetachedPermanentlyFlag(ContextPtr context, const String & table_name, const String & table_metadata_path, bool attach);
virtual void setDetachedTableNotInUseForce(const UUID & /*uuid*/) {}
std::atomic_flag directories_created = ATOMIC_FLAG_INIT;
void createDirectories();
const String metadata_path;
const String data_path;
};

View File

@ -55,7 +55,7 @@ static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
static constexpr const char * const CONVERT_TO_REPLICATED_FLAG_NAME = "convert_to_replicated";
DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata_path_, ContextPtr context_)
: DatabaseOrdinary(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseOrdinary (" + name_ + ")", context_)
: DatabaseOrdinary(name_, metadata_path_, std::filesystem::path("data") / escapeForFileName(name_) / "", "DatabaseOrdinary (" + name_ + ")", context_)
{
}
@ -265,7 +265,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
}
};
iterateMetadataFiles(local_context, process_metadata);
iterateMetadataFiles(process_metadata);
size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count;
size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries;

View File

@ -14,6 +14,8 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
}
DatabasesOverlay::DatabasesOverlay(const String & name_, ContextPtr context_)
@ -124,6 +126,39 @@ StoragePtr DatabasesOverlay::detachTable(ContextPtr context_, const String & tab
getEngineName());
}
void DatabasesOverlay::renameTable(
ContextPtr current_context,
const String & name,
IDatabase & to_database,
const String & to_name,
bool exchange,
bool dictionary)
{
for (auto & db : databases)
{
if (db->isTableExist(name, current_context))
{
if (DatabasesOverlay * to_overlay_database = typeid_cast<DatabasesOverlay *>(&to_database))
{
/// Renaming from Overlay database inside itself or into another Overlay database.
/// Just use the first database in the overlay as a destination.
if (to_overlay_database->databases.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The destination Overlay database {} does not have any members", to_database.getDatabaseName());
db->renameTable(current_context, name, *to_overlay_database->databases[0], to_name, exchange, dictionary);
}
else
{
/// Renaming into a different type of database. E.g. from Overlay on top of Atomic database into just Atomic database.
db->renameTable(current_context, name, to_database, to_name, exchange, dictionary);
}
return;
}
}
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuote(getDatabaseName()), backQuote(name));
}
ASTPtr DatabasesOverlay::getCreateTableQueryImpl(const String & name, ContextPtr context_, bool throw_on_error) const
{
ASTPtr result = nullptr;
@ -178,6 +213,18 @@ String DatabasesOverlay::getTableDataPath(const ASTCreateQuery & query) const
return result;
}
UUID DatabasesOverlay::getUUID() const
{
UUID result = UUIDHelpers::Nil;
for (const auto & db : databases)
{
result = db->getUUID();
if (result != UUIDHelpers::Nil)
break;
}
return result;
}
UUID DatabasesOverlay::tryGetTableUUID(const String & table_name) const
{
UUID result = UUIDHelpers::Nil;

View File

@ -35,12 +35,21 @@ public:
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void renameTable(
ContextPtr current_context,
const String & name,
IDatabase & to_database,
const String & to_name,
bool exchange,
bool dictionary) override;
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;
ASTPtr getCreateDatabaseQuery() const override;
String getTableDataPath(const String & table_name) const override;
String getTableDataPath(const ASTCreateQuery & query) const override;
UUID getUUID() const override;
UUID tryGetTableUUID(const String & table_name) const override;
void drop(ContextPtr context) override;

View File

@ -416,6 +416,7 @@ public:
std::lock_guard lock{mutex};
return database_name;
}
/// Get UUID of database.
virtual UUID getUUID() const { return UUIDHelpers::Nil; }

View File

@ -46,6 +46,7 @@ DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), binlog_client_, settings.get())
{
createDirectories();
}
void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const

View File

@ -86,6 +86,7 @@ private:
Poco::Timestamp::fromEpochTime(
std::chrono::duration_cast<std::chrono::seconds>(
static_cast<std::chrono::system_clock::time_point>(blob.Details.LastModified).time_since_epoch()).count()),
blob.Details.ETag.ToString(),
{}}));
}
@ -186,6 +187,7 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
Poco::Timestamp::fromEpochTime(
std::chrono::duration_cast<std::chrono::seconds>(
static_cast<std::chrono::system_clock::time_point>(blob.Details.LastModified).time_since_epoch()).count()),
blob.Details.ETag.ToString(),
{}}));
}

View File

@ -205,7 +205,7 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
}
total_size += size;
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}, {}});
}
ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject()

View File

@ -222,6 +222,7 @@ void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithM
ObjectMetadata{
static_cast<uint64_t>(ls.file_info[i].mSize),
Poco::Timestamp::fromEpochTime(ls.file_info[i].mLastMod),
"",
{}}));
}

View File

@ -54,6 +54,7 @@ struct ObjectMetadata
{
uint64_t size_bytes = 0;
Poco::Timestamp last_modified;
std::string etag;
ObjectAttributes attributes;
};

View File

@ -146,7 +146,7 @@ private:
auto objects = outcome.GetResult().GetContents();
for (const auto & object : objects)
{
ObjectMetadata metadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}};
ObjectMetadata metadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), object.GetETag(), {}};
batch.emplace_back(std::make_shared<RelativePathWithMetadata>(object.GetKey(), std::move(metadata)));
}
@ -332,6 +332,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
ObjectMetadata{
static_cast<uint64_t>(object.GetSize()),
Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()),
object.GetETag(),
{}}));
if (max_keys)
@ -479,6 +480,7 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
ObjectMetadata result;
result.size_bytes = object_info.size;
result.last_modified = Poco::Timestamp::fromEpochTime(object_info.last_modification_time);
result.etag = object_info.etag;
result.attributes = object_info.metadata;
return result;

View File

@ -54,6 +54,7 @@ namespace
ObjectInfo object_info;
object_info.size = static_cast<size_t>(result.GetContentLength());
object_info.last_modification_time = result.GetLastModified().Seconds();
object_info.etag = result.GetETag();
if (with_metadata)
object_info.metadata = result.GetMetadata();

View File

@ -15,6 +15,7 @@ struct ObjectInfo
{
size_t size = 0;
time_t last_modification_time = 0;
String etag;
std::map<String, String> metadata = {}; /// Set only if getObjectInfo() is called with `with_metadata = true`.
};

View File

@ -648,10 +648,8 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize)
{
if (shrink_blocks)
return; /// Already shrunk
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks;
@ -659,15 +657,21 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
auto max_total_bytes_in_join = table_join->sizeLimits().max_bytes;
/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
if (!force_optimize)
{
if (shrink_blocks)
return; /// Already shrunk
/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
}
LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker",
ReadableSize(total_bytes_in_join), max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "",

View File

@ -372,7 +372,7 @@ public:
void debugKeys() const;
void shrinkStoredBlocksToFit(size_t & total_bytes_in_join);
void shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize = false);
void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; }

View File

@ -27,7 +27,6 @@ class ASTQueryWithTableAndOutput;
class ASTTableIdentifier;
class Context;
// TODO(ilezhankin): refactor and merge |ASTTableIdentifier|
struct StorageID
{
String database_name;

View File

@ -20,6 +20,7 @@ public:
UInt16 portNumber() const override { return tcp_server->portNumber(); }
size_t currentConnections() const override { return tcp_server->currentConnections(); }
size_t currentThreads() const override { return tcp_server->currentThreads(); }
size_t refusedConnections() const override { return tcp_server->refusedConnections(); }
private:
std::unique_ptr<TCPServer> tcp_server;
@ -54,6 +55,7 @@ public:
UInt16 portNumber() const override { return grpc_server->portNumber(); }
size_t currentConnections() const override { return grpc_server->currentConnections(); }
size_t currentThreads() const override { return grpc_server->currentThreads(); }
size_t refusedConnections() const override { return 0; }
private:
std::unique_ptr<GRPCServer> grpc_server;

View File

@ -38,6 +38,8 @@ public:
/// Returns the number of currently handled connections.
size_t currentConnections() const { return impl->currentConnections(); }
size_t refusedConnections() const { return impl->refusedConnections(); }
/// Returns the number of current threads.
size_t currentThreads() const { return impl->currentThreads(); }
@ -61,6 +63,7 @@ private:
virtual UInt16 portNumber() const = 0;
virtual size_t currentConnections() const = 0;
virtual size_t currentThreads() const = 0;
virtual size_t refusedConnections() const = 0;
};
class TCPServerAdapterImpl;
class GRPCServerAdapterImpl;

View File

@ -204,7 +204,9 @@ Chunk StorageObjectStorageSource::generate()
{.path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_info->metadata->size_bytes,
.filename = &filename,
.last_modified = object_info->metadata->last_modified});
.last_modified = object_info->metadata->last_modified,
.etag = &(object_info->metadata->etag)
});
const auto & partition_columns = configuration->getPartitionColumns();
if (!partition_columns.empty() && chunk_size && chunk.hasColumns())

View File

@ -75,6 +75,7 @@ StorageJoin::StorageJoin(
table_join = std::make_shared<TableJoin>(limits, use_nulls, kind, strictness, key_names);
join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
restore();
optimizeUnlocked();
}
RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const
@ -99,6 +100,47 @@ SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataP
return StorageSetOrJoinBase::write(query, metadata_snapshot, context, /*async_insert=*/false);
}
bool StorageJoin::optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
bool cleanup,
ContextPtr context)
{
if (partition)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Partition cannot be specified when optimizing table of type Join");
if (final)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FINAL cannot be specified when optimizing table of type Join");
if (deduplicate)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DEDUPLICATE cannot be specified when optimizing table of type Join");
if (cleanup)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CLEANUP cannot be specified when optimizing table of type Join");
std::lock_guard mutate_lock(mutate_mutex);
TableLockHolder lock_holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
optimizeUnlocked();
return true;
}
void StorageJoin::optimizeUnlocked()
{
size_t current_bytes = join->getTotalByteCount();
size_t dummy = current_bytes;
join->shrinkStoredBlocksToFit(dummy, true);
size_t optimized_bytes = join->getTotalByteCount();
if (current_bytes > optimized_bytes)
LOG_INFO(getLogger("StorageJoin"), "Optimized Join storage from {} to {} bytes", current_bytes, optimized_bytes);
}
void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &)
{
std::lock_guard mutate_lock(mutate_mutex);

View File

@ -61,6 +61,18 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool async_insert) override;
bool optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & /*partition*/,
bool /*final*/,
bool /*deduplicate*/,
const Names & /* deduplicate_by_columns */,
bool /*cleanup*/,
ContextPtr /*context*/) override;
void optimizeUnlocked();
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -116,7 +116,7 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
NameSet getVirtualNamesForFileLikeStorage()
{
return {"_path", "_file", "_size", "_time"};
return {"_path", "_file", "_size", "_time", "_etag"};
}
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns)
@ -135,6 +135,7 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
return desc;
}
@ -230,6 +231,13 @@ void addRequestedFileLikeStorageVirtualsToChunk(
else
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_etag")
{
if (virtual_values.etag)
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), (*virtual_values.etag))->convertToFullColumnIfConst());
else
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
}
}
}

View File

@ -83,7 +83,7 @@ struct VirtualsForFileLikeStorage
std::optional<size_t> size { std::nullopt };
const String * filename { nullptr };
std::optional<Poco::Timestamp> last_modified { std::nullopt };
const String * etag { nullptr };
};
void addRequestedFileLikeStorageVirtualsToChunk(

View File

@ -1,7 +1,7 @@
version: '2.3'
services:
hdfs1:
image: sequenceiq/hadoop-docker:2.7.0
image: prasanthj/docker-hadoop:2.6.0
hostname: hdfs1
restart: always
expose:

View File

@ -89,3 +89,5 @@ def test_grant_current_database_on_cluster():
assert ch1.query("SHOW DATABASES", user="test_user") == "user_db\n"
ch1.query("GRANT SELECT ON * TO test_user ON CLUSTER 'cluster'", user="test_user")
assert ch1.query("SHOW DATABASES", user="test_user") == "user_db\n"
ch1.query("DROP DATABASE user_db ON CLUSTER 'cluster'")
ch1.query("DROP USER test_user ON CLUSTER 'cluster'")

View File

@ -65,8 +65,8 @@ def test_ignore_obsolete_grant_on_database():
"-c",
f"""
cat > /var/lib/clickhouse/access/{user_id}.sql << EOF
ATTACH USER X;
ATTACH GRANT CREATE FUNCTION, SELECT ON mydb.* TO X;
ATTACH USER \`{user_id}\`;
ATTACH GRANT CREATE FUNCTION, SELECT ON mydb.* TO \`{user_id}\`;
EOF""",
]
)
@ -76,4 +76,9 @@ EOF""",
)
instance.start_clickhouse()
assert instance.query("SHOW GRANTS FOR X") == "GRANT SELECT ON mydb.* TO X\n"
assert (
instance.query(f"SHOW GRANTS FOR `{user_id}`")
== f"GRANT SELECT ON mydb.* TO `{user_id}`\n"
)
instance.stop_clickhouse()
instance.start_clickhouse()

View File

@ -59,3 +59,4 @@ def test_comment(started_cluster):
expected = "CREATE TABLE default.test_table (`id` Int64 COMMENT \\'column_comment_2\\') ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\') ORDER BY id SETTINGS index_granularity = 8192 COMMENT \\'table_comment_2\\'"
assert_create_query([node_1, node_2], "default", "test_table", expected)
node_1.query("DROP TABLE test_table ON CLUSTER 'cluster'")

View File

@ -36,6 +36,16 @@ def cluster():
cluster.shutdown()
def drop_table(node, table_name, replicated):
create_table_statement = f"DROP TABLE {table_name} SYNC"
if replicated:
node.query_with_retry(create_table_statement)
else:
node.query(create_table_statement)
def create_table(node, table_name, replicated, additional_settings):
settings = {
"storage_policy": "two_disks",
@ -158,6 +168,9 @@ def test_alter_moving(
assert data_digest == "1000\n"
for node in nodes:
drop_table(node, table_name, replicated_engine)
def test_delete_race_leftovers(cluster):
"""
@ -248,3 +261,4 @@ def test_delete_race_leftovers(cluster):
# Check that we have all data
assert table_digest == node.query(table_digest_query)
drop_table(node, table_name, replicated=True)

View File

@ -88,6 +88,9 @@ def test_alter_on_cluter_non_replicated(started_cluster):
assert node3.query("SELECT COUNT() FROM test_table") == "2\n"
assert node4.query("SELECT COUNT() FROM test_table") == "2\n"
for node in [node1, node2, node3, node4]:
node.query("TRUNCATE TABLE test_table")
def test_alter_replicated_on_cluster(started_cluster):
for node in [node1, node3]:
@ -133,3 +136,6 @@ def test_alter_replicated_on_cluster(started_cluster):
assert node2.query("SELECT COUNT() FROM test_table_replicated") == "2\n"
assert node3.query("SELECT COUNT() FROM test_table_replicated") == "2\n"
assert node4.query("SELECT COUNT() FROM test_table_replicated") == "2\n"
for node in [node1, node2, node3, node4]:
node.query("TRUNCATE TABLE test_table_replicated")

View File

@ -1,6 +1,7 @@
import os
import pytest
import uuid
import time
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
@ -31,13 +32,15 @@ def started_cluster():
def test_read_write_storage(started_cluster):
id = uuid.uuid4()
hdfs_api = started_cluster.hdfs_api
filename = f"simple_storage_{id}"
node1.query("drop table if exists SimpleHDFSStorage SYNC")
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')"
f"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/{filename}', 'TSV')"
)
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert hdfs_api.read_data(f"/{filename}") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
@ -92,6 +95,11 @@ def test_read_write_storage_with_globs(started_cluster):
print(ex)
assert "in readonly mode" in str(ex)
node1.query("drop table HDFSStorageWithRange")
node1.query("drop table HDFSStorageWithEnum")
node1.query("drop table HDFSStorageWithQuestionMark")
node1.query("drop table HDFSStorageWithAsterisk")
def test_storage_with_multidirectory_glob(started_cluster):
hdfs_api = started_cluster.hdfs_api
@ -137,7 +145,6 @@ def test_read_write_table(started_cluster):
def test_write_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')"
)
@ -148,6 +155,8 @@ def test_write_table(started_cluster):
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
assert hdfs_api.read_data("/other_storage") == result
assert node1.query("select * from OtherHDFSStorage order by id") == result
node1.query("truncate table OtherHDFSStorage")
node1.query("drop table OtherHDFSStorage")
def test_bad_hdfs_uri(started_cluster):
@ -166,6 +175,7 @@ def test_bad_hdfs_uri(started_cluster):
print(ex)
assert "Unable to connect to HDFS" in str(ex)
node1.query("drop table BadStorage2")
try:
node1.query(
"create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/<>', 'TSV')"
@ -173,6 +183,7 @@ def test_bad_hdfs_uri(started_cluster):
except Exception as ex:
print(ex)
assert "Unable to open HDFS file" in str(ex)
node1.query("drop table BadStorage3")
@pytest.mark.timeout(800)
@ -304,6 +315,8 @@ def test_write_gz_storage(started_cluster):
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
node1.query("truncate table GZHDFSStorage")
node1.query("drop table GZHDFSStorage")
def test_write_gzip_storage(started_cluster):
@ -315,6 +328,8 @@ def test_write_gzip_storage(started_cluster):
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
node1.query("truncate table GZIPHDFSStorage")
node1.query("drop table GZIPHDFSStorage")
def test_virtual_columns(started_cluster):
@ -333,6 +348,7 @@ def test_virtual_columns(started_cluster):
)
== expected
)
node1.query("drop table virtual_cols")
def test_read_files_with_spaces(started_cluster):
@ -354,6 +370,7 @@ def test_read_files_with_spaces(started_cluster):
)
assert node1.query("select * from test order by id") == "1\n2\n3\n"
fs.delete(dir, recursive=True)
node1.query(f"drop table test")
def test_truncate_table(started_cluster):
@ -375,47 +392,54 @@ def test_truncate_table(started_cluster):
def test_partition_by(started_cluster):
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
id = uuid.uuid4()
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
dir = f"partition_{id}"
fs.mkdirs(f"/{dir}/", permission=777)
file_name = "test_{_partition_id}"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (1, 3, 2)"
table_function = f"hdfs('hdfs://hdfs1:9000/{file_name}', 'TSV', '{table_format}')"
table_function = (
f"hdfs('hdfs://hdfs1:9000/{dir}/{file_name}', 'TSV', '{table_format}')"
)
node1.query(
f"insert into table function {table_function} PARTITION BY {partition_by} values {values}"
)
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_1', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_1', 'TSV', '{table_format}')"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_2', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_2', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t3\t2"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_3', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_3', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t2\t3"
file_name = "test2_{_partition_id}"
node1.query(
f"create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS('hdfs://hdfs1:9000/{file_name}', 'TSV') partition by column3"
f"create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS('hdfs://hdfs1:9000/{dir}/{file_name}', 'TSV') partition by column3"
)
node1.query(f"insert into p values {values}")
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_1', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test2_1', 'TSV', '{table_format}')"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_2', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test2_2', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t3\t2"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_3', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test2_3', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t2\t3"
node1.query(f"drop table p")
fs.delete("/{dir}", recursive=True)
def test_seekable_formats(started_cluster):
@ -425,7 +449,7 @@ def test_seekable_formats(started_cluster):
f"hdfs('hdfs://hdfs1:9000/parquet', 'Parquet', 'a Int32, b String')"
)
node1.query(
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)"
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1"
)
result = node1.query(f"SELECT count() FROM {table_function}")
@ -433,7 +457,7 @@ def test_seekable_formats(started_cluster):
table_function = f"hdfs('hdfs://hdfs1:9000/orc', 'ORC', 'a Int32, b String')"
node1.query(
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)"
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1"
)
result = node1.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
@ -457,7 +481,7 @@ def test_read_table_with_default(started_cluster):
def test_schema_inference(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/native', 'Native', 'a Int32, b String') SELECT number, randomString(100) FROM numbers(5000000)"
f"insert into table function hdfs('hdfs://hdfs1:9000/native', 'Native', 'a Int32, b String') SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1"
)
result = node1.query(f"desc hdfs('hdfs://hdfs1:9000/native', 'Native')")
@ -476,6 +500,7 @@ def test_schema_inference(started_cluster):
result = node1.query(f"select count(*) from schema_inference")
assert int(result) == 5000000
node1.query(f"drop table schema_inference")
def test_hdfsCluster(started_cluster):
@ -510,6 +535,7 @@ def test_hdfs_directory_not_exist(started_cluster):
assert "" == node1.query(
"select * from HDFSStorageWithNotExistDir settings hdfs_ignore_file_doesnt_exist=1"
)
node1.query("drop table HDFSStorageWithNotExistDir")
def test_overwrite(started_cluster):
@ -529,12 +555,16 @@ def test_overwrite(started_cluster):
result = node1.query(f"select count() from test_overwrite")
assert int(result) == 10
node1.query(f"truncate table test_overwrite")
node1.query(f"drop table test_overwrite")
def test_multiple_inserts(started_cluster):
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
id = uuid.uuid4()
fs.mkdirs(f"/{id}/", permission=777)
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
table_function = f"hdfs('hdfs://hdfs1:9000/{id}/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)"
@ -551,7 +581,7 @@ def test_multiple_inserts(started_cluster):
result = node1.query(f"drop table test_multiple_inserts")
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
table_function = f"hdfs('hdfs://hdfs1:9000/{id}/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)"
@ -565,6 +595,7 @@ def test_multiple_inserts(started_cluster):
result = node1.query(f"select count() from test_multiple_inserts")
assert int(result) == 60
node1.query(f"drop table test_multiple_inserts")
def test_format_detection(started_cluster):
@ -574,50 +605,58 @@ def test_format_detection(started_cluster):
node1.query(f"insert into arrow_table select 1")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/data.arrow')")
assert int(result) == 1
node1.query(f"truncate table arrow_table")
node1.query(f"drop table arrow_table")
def test_schema_inference_with_globs(started_cluster):
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
dir = "/test_schema_inference_with_globs"
fs.mkdirs(dir)
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
f"insert into table function hdfs('hdfs://hdfs1:9000{dir}/data1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
)
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data2.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select 0"
f"insert into table function hdfs('hdfs://hdfs1:9000{dir}/data2.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select 0"
)
result = node1.query(
f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0"
f"desc hdfs('hdfs://hdfs1:9000{dir}/data*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0"
)
assert result.strip() == "c1\tNullable(Int64)"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0"
f"select * from hdfs('hdfs://hdfs1:9000{dir}/data*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0"
)
assert sorted(result.split()) == ["0", "\\N"]
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data3.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
f"insert into table function hdfs('hdfs://hdfs1:9000{dir}/data3.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
)
filename = "data{1,3}.jsoncompacteachrow"
result = node1.query_and_get_error(
f"desc hdfs('hdfs://hdfs1:9000/{filename}') settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0"
f"desc hdfs('hdfs://hdfs1:9000{dir}/{filename}') settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0"
)
assert "All attempts to extract table structure from files failed" in result
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'"
f"insert into table function hdfs('hdfs://hdfs1:9000{dir}/data0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'"
)
result = node1.query_and_get_error(
f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow') settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0"
f"desc hdfs('hdfs://hdfs1:9000{dir}/data*.jsoncompacteachrow') settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0"
)
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in result
fs.delete(dir, recursive=True)
def test_insert_select_schema_inference(started_cluster):
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/test.native.zst') select toUInt64(1) as x"
)
@ -627,6 +666,7 @@ def test_insert_select_schema_inference(started_cluster):
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test.native.zst')")
assert int(result) == 1
fs.delete("/test.native.zst")
def test_cluster_join(started_cluster):
@ -658,6 +698,7 @@ def test_cluster_macro(started_cluster):
def test_virtual_columns_2(started_cluster):
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
table_function = (
f"hdfs('hdfs://hdfs1:9000/parquet_2', 'Parquet', 'a Int32, b String')"
@ -674,6 +715,8 @@ def test_virtual_columns_2(started_cluster):
result = node1.query(f"SELECT _path FROM {table_function}")
assert result.strip() == "kek"
fs.delete("/parquet_2")
fs.delete("/parquet_3")
def check_profile_event_for_query(node, file, profile_event, amount=1):
@ -967,11 +1010,11 @@ def test_read_subcolumns(started_cluster):
node = started_cluster.instances["node1"]
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3) settings hdfs_truncate_on_insert=1"
)
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3) settings hdfs_truncate_on_insert=1"
)
res = node.query(
@ -1003,7 +1046,7 @@ def test_read_subcolumn_time(started_cluster):
node = started_cluster.instances["node1"]
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumn_time.tsv', auto, 'a UInt32') select (42)"
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumn_time.tsv', auto, 'a UInt32') select (42) settings hdfs_truncate_on_insert=1"
)
res = node.query(
@ -1014,91 +1057,103 @@ def test_read_subcolumn_time(started_cluster):
def test_union_schema_inference_mode(started_cluster):
id = uuid.uuid4()
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
dir = f"union_{id}"
fs.mkdirs(f"/{dir}/", permission=777)
node = started_cluster.instances["node1"]
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference1.jsonl') select 1 as a"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference1.jsonl') select 1 as a"
)
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference2.jsonl') select 2 as b"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference2.jsonl') select 2 as b"
)
node.query("system drop schema cache for hdfs")
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "a\tNullable(Int64)\nb\tNullable(Int64)\n"
result = node.query(
"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV"
f"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV"
)
assert (
result == "UNION\ttest_union_schema_inference1.jsonl\ta Nullable(Int64)\n"
"UNION\ttest_union_schema_inference2.jsonl\tb Nullable(Int64)\n"
)
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "1\t\\N\n" "\\N\t2\n"
node.query(f"system drop schema cache for hdfs")
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "b\tNullable(Int64)\n"
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "a\tNullable(Int64)\n" "b\tNullable(Int64)\n"
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference3.jsonl', TSV) select 'Error'"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference3.jsonl', TSV) select 'Error'"
)
error = node.query_and_get_error(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in error
def test_format_detection(started_cluster):
node = started_cluster.instances["node1"]
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
id = uuid.uuid4()
dir = f"{id}"
fs.mkdirs(f"/{dir}/", permission=777)
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_format_detection0', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(0)"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection0', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(0)"
)
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_format_detection1', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(10)"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(10)"
)
expected_desc_result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_format_detection1', JSONEachRow)"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', JSONEachRow)"
)
desc_result = node.query("desc hdfs('hdfs://hdfs1:9000/test_format_detection1')")
desc_result = node.query(
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1')"
)
assert expected_desc_result == desc_result
expected_result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection1', JSONEachRow, 'x UInt64, y String') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', JSONEachRow, 'x UInt64, y String') order by x, y"
)
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection1') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection1', auto, 'x UInt64, y String') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', auto, 'x UInt64, y String') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection{0,1}') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}') order by x, y"
)
assert expected_result == result
@ -1106,25 +1161,25 @@ def test_format_detection(started_cluster):
node.query("system drop schema cache for hdfs")
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection{0,1}') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/test_format_detection{0,1}') order by x, y"
f"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/test_format_detection{0,1}', auto, auto) order by x, y"
f"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}', auto, auto) order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/test_format_detection{0,1}', auto, 'x UInt64, y String') order by x, y"
f"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}', auto, 'x UInt64, y String') order by x, y"
)
assert expected_result == result

View File

@ -27,6 +27,7 @@ RENAME DICTIONARY test_01191.t TO test_01191.dict1; -- {serverError INCORRECT_QU
DROP DICTIONARY test_01191.t; -- {serverError INCORRECT_QUERY}
DROP TABLE test_01191.t;
DROP DATABASE IF EXISTS dummy_db;
CREATE DATABASE dummy_db ENGINE=Atomic;
RENAME DICTIONARY test_01191.dict TO dummy_db.dict1;
RENAME DICTIONARY dummy_db.dict1 TO test_01191.dict;

View File

@ -1,2 +1,2 @@
CREATE TABLE default.`table`\n(\n `key` String\n)\nENGINE = File(\'TSVWithNamesAndTypes\', \'/dev/null\')
CREATE TABLE foo.`table`\n(\n `key` String\n)\nENGINE = File(\'TSVWithNamesAndTypes\', \'/dev/null\')
CREATE TEMPORARY TABLE `table`\n(\n `key` String\n)\nENGINE = File(TSVWithNamesAndTypes, \'/dev/null\')
CREATE TEMPORARY TABLE `table`\n(\n `key` String\n)\nENGINE = File(TSVWithNamesAndTypes, \'/dev/null\')

View File

@ -4,5 +4,5 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL --file /dev/null --structure "key String" --input-format TSVWithNamesAndTypes --interactive --send_logs_level=trace <<<'show create table table'
$CLICKHOUSE_LOCAL --database foo --file /dev/null --structure "key String" --input-format TSVWithNamesAndTypes --interactive --send_logs_level=trace <<<'show create table table'
$CLICKHOUSE_LOCAL --file /dev/null --structure "key String" --input-format TSVWithNamesAndTypes --interactive --send_logs_level=trace <<<'show create temporary table table'
$CLICKHOUSE_LOCAL --database foo --file /dev/null --structure "key String" --input-format TSVWithNamesAndTypes --interactive --send_logs_level=trace <<<'show create temporary table table'

View File

@ -11,5 +11,5 @@ create table test_02245_2 (a UInt64, _path Int32) engine = S3(s3_conn, filename=
insert into test_02245_2 select 1, 2 settings s3_truncate_on_insert=1;
select * from test_02245_2;
1 2
select _path from test_02245_2;
2
select _path, isNotNull(_etag) from test_02245_2;
2 1

View File

@ -12,4 +12,4 @@ drop table if exists test_02245_2;
create table test_02245_2 (a UInt64, _path Int32) engine = S3(s3_conn, filename='test_02245_2', format=Parquet);
insert into test_02245_2 select 1, 2 settings s3_truncate_on_insert=1;
select * from test_02245_2;
select _path from test_02245_2;
select _path, isNotNull(_etag) from test_02245_2;

View File

@ -9,7 +9,12 @@ while :; do
pid=$!
sleep 1.5
duration="$($CLICKHOUSE_CLIENT -q "select floor(elapsed) from system.processes where current_database = currentDatabase() and query not like '%system.processes%'")"
kill -INT $pid
# The process might not exist at this point in some exception situations
# maybe it was killed by OOM?
# It safe to skip this iteration.
if ! kill -INT $pid > /dev/null 2>&1; then
continue
fi
wait
$CLICKHOUSE_CLIENT -q "kill query where current_database = currentDatabase() sync format Null"
if [[ $duration -eq 1 ]]; then

View File

@ -0,0 +1,6 @@
123
Hello
['Hello','world']
Hello
Hello
['Hello','world']

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_LOCAL} -n "
CREATE TABLE test (x UInt8) ORDER BY x;
INSERT INTO test VALUES (123);
SELECT * FROM test;
CREATE OR REPLACE TABLE test (s String) ORDER BY s;
INSERT INTO test VALUES ('Hello');
SELECT * FROM test;
RENAME TABLE test TO test2;
CREATE OR REPLACE TABLE test (s Array(String)) ORDER BY s;
INSERT INTO test VALUES (['Hello', 'world']);
SELECT * FROM test;
SELECT * FROM test2;
EXCHANGE TABLES test AND test2;
SELECT * FROM test;
SELECT * FROM test2;
DROP TABLE test;
DROP TABLE test2;
"

View File

@ -0,0 +1,10 @@
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9

View File

@ -0,0 +1,5 @@
CREATE TABLE dict_03204 (k UInt64, v UInt64) ENGINE = Join(ANY, LEFT, k);
INSERT INTO dict_03204 SELECT number, number FROM numbers(10);
OPTIMIZE TABLE dict_03204;
SELECT * FROM dict_03204 ORDER BY k;
DROP TABLE dict_03204;

View File

@ -247,6 +247,7 @@ DoubleDelta
Doxygen
Durre
ECMA
ETag
Ecto
EdgeAngle
EdgeLengthKm
@ -1587,6 +1588,7 @@ enum's
enums
erfc
errorCodeToName
etag
evalMLMethod
exFAT
expiryMsec