Skip to content
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7885,6 +7885,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest'
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_s3_tables, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4)
)", EXPERIMENTAL) \
DECLARE(UInt64, webassembly_udf_max_fuel, 100'000, R"(
Fuel limit per WebAssembly UDF instance execution. Each WebAssembly instruction consumes some amount of fuel.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"iceberg_expire_default_max_ref_age_ms", 9223372036854775807, 9223372036854775807, "New setting."},
{"max_skip_unavailable_shards_num", 0, 0, "New setting to limit the number of shards that can be silently skipped when skip_unavailable_shards is enabled."},
{"max_skip_unavailable_shards_ratio", 0, 0, "New setting to limit the ratio of shards that can be silently skipped when skip_unavailable_shards is enabled."},
{"allow_experimental_database_s3_tables", false, false, "New setting to enable experimental database S3 tables (AWS Iceberg REST catalog)."},
});
addSettingsChanges(settings_changes_history, "26.2",
{
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ IMPLEMENT_SETTING_ENUM(
{"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE},
{"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE},
{"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE},
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}})
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST},
{"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}})

IMPLEMENT_SETTING_ENUM(
FileCachePolicy,
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t
ICEBERG_ONELAKE,
ICEBERG_BIGLAKE,
PAIMON_REST,
S3_TABLES,
};

DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType)
Expand Down
110 changes: 110 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Databases/DataLake/AWSV4Signer.h>

#include <Common/Exception.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/String.h>

#include <aws/core/auth/signer/AWSAuthV4Signer.h>
#include <aws/core/http/standard/StandardHttpRequest.h>
#include <aws/core/http/URI.h>
#include <aws/core/utils/memory/AWSMemory.h>

#include <sstream>
#include <utility>

namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
}
}

namespace DataLake
{
namespace
{

Aws::Http::HttpMethod mapPocoMethodToAws(const String & method)
{
using Aws::Http::HttpMethod;
using Poco::Net::HTTPRequest;

static const std::pair<String, HttpMethod> supported_methods[] = {
{HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET},
{HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST},
{HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT},
{HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE},
{HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD},
{HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH},
};

for (const auto & [poco_method, aws_method] : supported_methods)
if (method == poco_method)
return aws_method;

throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method);
}

}

void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers)
{
const Aws::Http::URI aws_uri(uri.toString().c_str());
Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method));

for (const auto & h : extra_headers)
{
if (Poco::icompare(h.name, "authorization") == 0)
continue;
request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size()));
}

if (!payload.empty())
{
auto body_stream = Aws::MakeShared<std::stringstream>("AWSV4Signer");
body_stream->write(payload.data(), static_cast<std::streamsize>(payload.size()));
body_stream->seekg(0);
request.AddContentBody(body_stream);
}

static constexpr bool sign_body = true;
if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body))
throw DB::Exception(DB::ErrorCodes::S3_ERROR, "AWS SigV4 signing failed");

bool has_authorization = false;
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "authorization") == 0 && !value.empty())
has_authorization = true;
}
if (!has_authorization)
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"AWS credentials are missing or incomplete; cannot sign S3 Tables REST request");

out_headers.clear();
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "host") == 0)
continue;
out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size()));
}
}

}

#endif
34 changes: 34 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Core/Types.h>
#include <IO/HTTPHeaderEntries.h>
#include <Poco/URI.h>

namespace Aws::Client
{
class AWSAuthV4Signer;
}

namespace DataLake
{

/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer.
/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts
/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI).
void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers);

}

#endif
54 changes: 51 additions & 3 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <Databases/DataLake/RestCatalog.h>
#include <Databases/DataLake/GlueCatalog.h>
#include <Databases/DataLake/PaimonRestCatalog.h>
#if USE_AWS_S3 && USE_SSL
#include <Databases/DataLake/S3TablesCatalog.h>
#endif
#include <DataTypes/DataTypeString.h>

#include <Storages/ObjectStorage/S3/Configuration.h>
Expand Down Expand Up @@ -91,6 +94,7 @@ namespace Setting
extern const SettingsBool allow_experimental_database_glue_catalog;
extern const SettingsBool allow_experimental_database_hms_catalog;
extern const SettingsBool allow_experimental_database_paimon_rest_catalog;
extern const SettingsBool allow_experimental_database_s3_tables;
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
Expand Down Expand Up @@ -143,8 +147,20 @@ void DatabaseDataLake::validateSettings()
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. "
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES)
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");

if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS warehouse=<table_bucket_arn>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
{
Expand Down Expand Up @@ -299,6 +315,23 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
}
break;
}
case DB::DatabaseDataLakeCatalogType::S3_TABLES:
{
#if USE_AWS_S3 && USE_SSL
catalog_impl = std::make_shared<DataLake::S3TablesCatalog>(
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::region].value,
catalog_parameters,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
#else
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL");
#endif
break;
}
}
return catalog_impl;
}
Expand Down Expand Up @@ -331,6 +364,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
case DatabaseDataLakeCatalogType::ICEBERG_HIVE:
case DatabaseDataLakeCatalogType::ICEBERG_REST:
case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE:
case DatabaseDataLakeCatalogType::S3_TABLES:
{
switch (type)
{
Expand Down Expand Up @@ -951,9 +985,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}

if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST
&& catalog_type != DatabaseDataLakeCatalogType::S3_TABLES)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only");
}

for (auto & engine_arg : engine_args)
Expand Down Expand Up @@ -1039,6 +1074,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
engine_func->name = "Paimon";
break;
}
case DatabaseDataLakeCatalogType::S3_TABLES:
{
if (!args.create_query.attach
&& !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables])
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"DatabaseDataLake with S3 Tables catalog is experimental. "
"To allow its usage, enable setting allow_experimental_database_s3_tables");
}

engine_func->name = "Iceberg";
break;
}
case DatabaseDataLakeCatalogType::NONE:
break;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ bool TableMetadata::hasStorageCredentials() const
return storage_credentials != nullptr;
}

bool TableMetadata::hasDataLakeSpecificProperties() const
{
return data_lake_specific_metadata.has_value();
}

std::string TableMetadata::getMetadataLocation(const std::string & iceberg_metadata_file_location) const
{
std::string metadata_location = iceberg_metadata_file_location;
Expand Down
35 changes: 25 additions & 10 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Pt
result.default_base_location = object->get("default-base-location").extract<String>();
}

DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const
DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(
bool update_token,
const String & /*method*/,
const Poco::URI & /*url*/,
const DB::HTTPHeaderEntries & /*extra_headers*/,
const String & /*body*/) const
{
/// Option 1: user specified auth header manually.
/// Header has format: 'Authorization: <scheme> <token>'.
Expand Down Expand Up @@ -387,7 +392,12 @@ BigLakeCatalog::BigLakeCatalog(
config = loadConfig();
}

DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(bool update_token) const
DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(
bool update_token,
const String & /*method*/,
const Poco::URI & /*url*/,
const DB::HTTPHeaderEntries & /*extra_headers*/,
const String & /*body*/) const
{
/// Google Cloud OAuth2 for BigLake.
/// Uses GCP metadata service or Application Default Credentials to get access token.
Expand Down Expand Up @@ -542,7 +552,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(

auto create_buffer = [&](bool update_token)
{
auto result_headers = getAuthHeaders(update_token);
auto result_headers = getAuthHeaders(update_token, Poco::Net::HTTPRequest::HTTP_GET, url, headers, {});
std::move(headers.begin(), headers.end(), std::back_inserter(result_headers));

return DB::BuilderRWBufferFromHTTP(url)
Expand Down Expand Up @@ -978,9 +988,6 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r
request_body->stringify(oss);
const std::string body_str = DB::removeEscapedSlashes(oss.str());

DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true);
headers.emplace_back("Content-Type", "application/json");

const auto & context = getContext();

DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback;
Expand All @@ -994,6 +1001,12 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r

/// enable_url_encoding=false to allow use tables with encoded sequences in names like 'foo%2Fbar'
Poco::URI url(endpoint, /* enable_url_encoding */ false);

DB::HTTPHeaderEntries extra_headers;
extra_headers.emplace_back("Content-Type", "application/json");

DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true, method, url, extra_headers, body_str);
headers.emplace_back("Content-Type", "application/json");
auto wb = DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withMethod(method)
Expand All @@ -1014,7 +1027,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r

void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, const String & location) const
{
const std::string endpoint = fmt::format("{}/namespaces", base_url);
const std::string endpoint = base_url / config.prefix / "namespaces";

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Expand Down Expand Up @@ -1046,7 +1059,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

createNamespaceIfNotExists(namespace_name, metadata_content->getValue<String>("location"));

const std::string endpoint = fmt::format("{}/namespaces/{}/tables", base_url, namespace_name);
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables";

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
request_body->set("name", table_name);
Expand Down Expand Up @@ -1083,7 +1096,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const
{
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name;

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Expand Down Expand Up @@ -1153,7 +1166,9 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_
"Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}?purgeRequested=False", base_url, namespace_name, table_name);
const std::string endpoint
= (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string()
+ "?purgeRequested=False";

Poco::JSON::Object::Ptr request_body = nullptr;
try
Expand Down
Loading
Loading