mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Merge branch 'hive_table' of https://github.com/bigo-sg/ClickHouse into bigo_hive_table
This commit is contained in:
commit
52c118856f
@ -274,6 +274,7 @@
|
||||
M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \
|
||||
\
|
||||
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
|
||||
M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <base/logger_useful.h>
|
||||
#include <base/sleep.h>
|
||||
#include <base/errnoToString.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/hex.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -16,7 +17,10 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ExternalDataSourceLocalCacheReadBytes;
|
||||
}
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -55,8 +59,15 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
|
||||
LOG_INFO(log, "Recover cached file failed. local path:{}", local_path_.string());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
cache_controller->file_metadata_ptr = RemoteFileMetadataFactory::instance().get(cache_controller->metadata_class);
|
||||
try
|
||||
{
|
||||
cache_controller->file_metadata_ptr = RemoteFileMetadataFactory::instance().get(cache_controller->metadata_class);
|
||||
}
|
||||
catch(...)
|
||||
{
|
||||
LOG_ERROR(log, "Get metadata class failed for {}", cache_controller->metadata_class);
|
||||
cache_controller->file_metadata_ptr = nullptr;
|
||||
}
|
||||
if (!cache_controller->file_metadata_ptr)
|
||||
{
|
||||
// do not load this invalid cached file and clear it. the clear action is in
|
||||
@ -96,6 +107,7 @@ RemoteCacheController::RemoteCacheController(
|
||||
// when we allocate a whole new file cache , file_metadata_ptr must not be null.
|
||||
if (file_metadata_ptr)
|
||||
{
|
||||
metadata_class = file_metadata_ptr->getName();
|
||||
auto metadata_file_writer = std::make_unique<WriteBufferFromFile>((local_path_ / "metadata.txt").string());
|
||||
auto str_buf = file_metadata_ptr->toString();
|
||||
metadata_file_writer->write(str_buf.c_str(), str_buf.size());
|
||||
@ -214,7 +226,7 @@ void RemoteCacheController::close()
|
||||
std::unique_ptr<ReadBufferFromFileBase> RemoteCacheController::allocFile()
|
||||
{
|
||||
ReadSettings settings;
|
||||
settings.local_fs_method = LocalFSReadMethod::read;
|
||||
//settings.local_fs_method = LocalFSReadMethod::read;
|
||||
auto file_buffer = createReadBufferFromFileBase((local_path / "data.bin").string(), settings);
|
||||
|
||||
if (file_buffer)
|
||||
@ -302,9 +314,12 @@ bool RemoteReadBuffer::nextImpl()
|
||||
|
||||
auto status = file_buffer->next();
|
||||
if (status)
|
||||
{
|
||||
BufferBase::set(file_buffer->buffer().begin(),
|
||||
file_buffer->buffer().size(),
|
||||
file_buffer->offset());
|
||||
ProfileEvents::increment(ProfileEvents::ExternalDataSourceLocalCacheReadBytes, file_buffer->available());
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -1903,7 +1903,7 @@ class ClickHouseInstance:
|
||||
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
|
||||
custom_dictionaries,
|
||||
macros, with_zookeeper, zookeeper_config_path, with_mysql_client, with_mysql, with_mysql8, with_mysql_cluster, with_kafka, with_kerberized_kafka,
|
||||
with_rabbitmq, with_nginx, with_kerberized_hdfs, with_mongo, with_redis, with_minio, with_azurite, with_jdbc_bridge, with_hive
|
||||
with_rabbitmq, with_nginx, with_kerberized_hdfs, with_mongo, with_redis, with_minio, with_azurite, with_jdbc_bridge, with_hive,
|
||||
with_cassandra, server_bin_path, odbc_bridge_bin_path, library_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, with_postgres, with_postgres_cluster,
|
||||
clickhouse_start_command=CLICKHOUSE_START_COMMAND,
|
||||
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True,
|
||||
|
@ -22,4 +22,11 @@
|
||||
<libhdfs3_conf>/etc/clickhouse-server/extra_conf.d/hdfs-site.xml</libhdfs3_conf>
|
||||
</hdfs>
|
||||
|
||||
<metric_log>
|
||||
<database>system</database>
|
||||
<table>metric_log</table>
|
||||
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
|
||||
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
|
||||
</metric_log>
|
||||
|
||||
</clickhouse>
|
||||
|
@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
import time
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
@ -26,6 +27,7 @@ def started_cluster():
|
||||
def test_create_parquet_table(started_cluster):
|
||||
logging.info('Start testing creating hive table ...')
|
||||
node = started_cluster.instances['h0_0_0']
|
||||
node.query("set input_format_parquet_allow_missing_columns = true")
|
||||
result = node.query("""
|
||||
CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day)
|
||||
""")
|
||||
@ -89,3 +91,22 @@ def test_text_count(started_cluster):
|
||||
2021-11-16 2
|
||||
"""
|
||||
assert result == expected_result
|
||||
|
||||
def test_parquet_groupby_witch_cache(started_cluster):
|
||||
logging.info('Start testing groupby ...')
|
||||
node = started_cluster.instances['h0_0_0']
|
||||
result = node.query("""
|
||||
SELECT day, count(*) FROM default.demo_parquet group by day order by day
|
||||
""")
|
||||
expected_result = """2021-11-01 1
|
||||
2021-11-05 2
|
||||
2021-11-11 1
|
||||
2021-11-16 2
|
||||
"""
|
||||
assert result == expected_result
|
||||
def test_cache_read_bytes(started_cluster):
|
||||
node = started_cluster.instances['h0_0_0']
|
||||
time.sleep(3)
|
||||
result = node.query("select sum(ProfileEvent_ExternalDataSourceLocalCacheReadBytes) from system.metric_log where ProfileEvent_ExternalDataSourceLocalCacheReadBytes > 0")
|
||||
logging.info("Read bytes from cache:{}".format(result))
|
||||
assert result.strip() != '0'
|
||||
|
Loading…
Reference in New Issue
Block a user