Skip to content

Commit 0d960ee

Browse files
author
Innocent
committed
add ability to combine reporters
1 parent d26fb96 commit 0d960ee

20 files changed

Lines changed: 526 additions & 349 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ set(ICEBERG_SOURCES
5858
manifest/v3_metadata.cc
5959
metadata_columns.cc
6060
metrics_config.cc
61-
metrics_reporters.cc
61+
metrics/metrics_reporters.cc
6262
name_mapping.cc
6363
partition_field.cc
6464
partition_spec.cc

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include <algorithm>
2323
#include <iterator>
2424

25-
#include "iceberg/metrics_reporters.h"
25+
#include "iceberg/metrics/metrics_reporters.h"
2626
#include "iceberg/table.h"
2727
#include "iceberg/table_identifier.h"
2828
#include "iceberg/table_metadata.h"

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include <shared_mutex>
2323

2424
#include "iceberg/catalog.h"
25-
#include "iceberg/metrics_reporter.h"
25+
#include "iceberg/metrics/metrics_reporter.h"
2626

2727
namespace iceberg {
2828

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
#include "iceberg/catalog/rest/rest_util.h"
3838
#include "iceberg/catalog/rest/types.h"
3939
#include "iceberg/json_serde_internal.h"
40-
#include "iceberg/metrics_reporters.h"
40+
#include "iceberg/metrics/metrics_reporters.h"
4141
#include "iceberg/partition_spec.h"
4242
#include "iceberg/result.h"
4343
#include "iceberg/schema.h"

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
#include "iceberg/catalog/rest/endpoint.h"
2929
#include "iceberg/catalog/rest/iceberg_rest_export.h"
3030
#include "iceberg/catalog/rest/type_fwd.h"
31-
#include "iceberg/metrics_reporter.h"
31+
#include "iceberg/metrics/metrics_reporter.h"
3232
#include "iceberg/result.h"
3333

3434
/// \file iceberg/catalog/rest/rest_catalog.h

src/iceberg/constants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace iceberg {
3333
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
3434
constexpr int64_t kInvalidSnapshotId = -1;
3535
constexpr int64_t kInvalidSequenceNumber = -1;
36+
constexpr int64_t kInvalidSchemaId = -1;
3637
/// \brief Stand-in for the current sequence number that will be assigned when the commit
3738
/// is successful. This is replaced when writing a manifest list by the ManifestFile
3839
/// adapter.

src/iceberg/meson.build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ iceberg_sources = files(
7575
'manifest/v2_metadata.cc',
7676
'manifest/v3_metadata.cc',
7777
'metadata_columns.cc',
78+
'metrics/metrics_reporters.cc',
7879
'metrics_config.cc',
79-
'metrics_reporters.cc',
8080
'name_mapping.cc',
8181
'partition_field.cc',
8282
'partition_spec.cc',
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <cstdint>
23+
#include <string>
24+
#include <unordered_map>
25+
26+
#include "iceberg/constants.h"
27+
#include "iceberg/iceberg_export.h"
28+
29+
namespace iceberg {
30+
31+
/// \brief Metrics collected during a table commit (snapshot creation).
32+
///
33+
struct ICEBERG_EXPORT CommitMetrics {
34+
/// \brief Number of data files added in this commit.
35+
int64_t added_data_files = 0;
36+
/// \brief Number of data files removed in this commit.
37+
int64_t removed_data_files = 0;
38+
/// \brief Total live data files after this commit.
39+
int64_t total_data_files = 0;
40+
/// \brief Number of delete files added in this commit.
41+
int64_t added_delete_files = 0;
42+
/// \brief Equality delete files added.
43+
int64_t added_equality_delete_files = 0;
44+
/// \brief Positional delete files added.
45+
int64_t added_positional_delete_files = 0;
46+
/// \brief Deletion vectors added.
47+
int64_t added_dvs = 0;
48+
/// \brief Positional delete files removed.
49+
int64_t removed_positional_delete_files = 0;
50+
/// \brief Deletion vectors removed.
51+
int64_t removed_dvs = 0;
52+
/// \brief Equality delete files removed.
53+
int64_t removed_equality_delete_files = 0;
54+
/// \brief Number of delete files removed in this commit.
55+
int64_t removed_delete_files = 0;
56+
/// \brief Total live delete files after this commit.
57+
int64_t total_delete_files = 0;
58+
/// \brief Number of records added in this commit.
59+
int64_t added_records = 0;
60+
/// \brief Number of records removed in this commit.
61+
int64_t removed_records = 0;
62+
/// \brief Total live records after this commit.
63+
int64_t total_records = 0;
64+
/// \brief Total byte size of files added.
65+
int64_t added_files_size_bytes = 0;
66+
/// \brief Total byte size of files removed.
67+
int64_t removed_files_size_bytes = 0;
68+
/// \brief Total byte size of all live files after this commit.
69+
int64_t total_files_size_bytes = 0;
70+
/// \brief Positional delete records added.
71+
int64_t added_positional_deletes = 0;
72+
/// \brief Positional delete records removed.
73+
int64_t removed_positional_deletes = 0;
74+
/// \brief Total positional delete records after this commit.
75+
int64_t total_positional_deletes = 0;
76+
/// \brief Equality delete records added.
77+
int64_t added_equality_deletes = 0;
78+
/// \brief Equality delete records removed.
79+
int64_t removed_equality_deletes = 0;
80+
/// \brief Total equality delete records after this commit.
81+
int64_t total_equality_deletes = 0;
82+
/// \brief Manifest files kept unchanged in this commit.
83+
int64_t kept_manifest_count = 0;
84+
/// \brief Manifest files created in this commit.
85+
int64_t created_manifest_count = 0;
86+
/// \brief Manifest files replaced in this commit.
87+
int64_t replaced_manifest_count = 0;
88+
/// \brief Manifest entries processed in this commit.
89+
int64_t processed_manifest_entries_count = 0;
90+
};
91+
92+
/// \brief Report generated after a commit operation.
93+
///
94+
/// Contains metrics about the changes made in a commit, including
95+
/// files added/removed and retry information.
96+
struct ICEBERG_EXPORT CommitReport {
97+
/// \brief The fully qualified name of the table that was modified.
98+
std::string table_name;
99+
/// \brief The snapshot ID created by this commit.
100+
int64_t snapshot_id = kInvalidSnapshotId;
101+
/// \brief The sequence number assigned to this commit.
102+
int64_t sequence_number = kInvalidSequenceNumber;
103+
/// \brief The operation that was performed (append, overwrite, delete, etc.).
104+
std::string operation;
105+
/// \brief Metrics collected during the commit operation.
106+
CommitMetrics commit_metrics;
107+
/// \brief Additional key-value metadata.
108+
std::unordered_map<std::string, std::string> metadata;
109+
};
110+
111+
} // namespace iceberg
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string_view>
23+
#include <variant>
24+
25+
#include "iceberg/iceberg_export.h"
26+
#include "iceberg/metrics/commit_report.h"
27+
#include "iceberg/metrics/scan_report.h"
28+
29+
namespace iceberg {
30+
31+
/// \brief The type of a metrics report.
32+
enum class MetricsReportType {
33+
kScanReport,
34+
kCommitReport,
35+
};
36+
37+
/// \brief Get the string representation of a metrics report type.
38+
ICEBERG_EXPORT constexpr std::string_view ToString(MetricsReportType type) noexcept {
39+
switch (type) {
40+
case MetricsReportType::kScanReport:
41+
return "scan";
42+
case MetricsReportType::kCommitReport:
43+
return "commit";
44+
}
45+
std::unreachable();
46+
}
47+
48+
/// \brief A metrics report, which can be either a ScanReport or CommitReport.
49+
///
50+
/// This variant type allows handling both report types uniformly through
51+
/// the MetricsReporter interface.
52+
using MetricsReport = std::variant<ScanReport, CommitReport>;
53+
54+
/// \brief Get the type of a metrics report.
55+
///
56+
/// \param report The metrics report to get the type of.
57+
/// \return The type of the metrics report.
58+
ICEBERG_EXPORT inline MetricsReportType GetReportType(const MetricsReport& report) {
59+
return std::visit(
60+
[](const auto& r) -> MetricsReportType {
61+
using T = std::decay_t<decltype(r)>;
62+
if constexpr (std::is_same_v<T, ScanReport>) {
63+
return MetricsReportType::kScanReport;
64+
} else {
65+
return MetricsReportType::kCommitReport;
66+
}
67+
},
68+
report);
69+
}
70+
71+
/// \brief Interface for reporting metrics from Iceberg operations.
72+
///
73+
/// Implementations of this interface can be used to collect and report
74+
/// metrics about scan and commit operations. Common implementations include
75+
/// logging reporters, metrics collectors, and the noop reporter for testing.
76+
class ICEBERG_EXPORT MetricsReporter {
77+
public:
78+
virtual ~MetricsReporter() = default;
79+
80+
/// \brief Report a metrics report.
81+
///
82+
/// Implementations should handle the report according to their purpose
83+
/// (e.g., logging, sending to a metrics service, etc.).
84+
///
85+
/// \param report The metrics report to process.
86+
virtual void Report(const MetricsReport& report) = 0;
87+
};
88+
89+
} // namespace iceberg
Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
* under the License.
1818
*/
1919

20-
#include "iceberg/metrics_reporters.h"
20+
#include "iceberg/metrics/metrics_reporters.h"
2121

22+
#include <iostream>
2223
#include <unordered_set>
2324

2425
#include "iceberg/util/string_util.h"
@@ -27,10 +28,10 @@ namespace iceberg {
2728

2829
namespace {
2930

30-
/// \brief Registry type for MetricsReporter factories with heterogeneous lookup support.
31+
/// \brief Registry type for MetricsReporter factories.
3132
using MetricsReporterRegistry = std::unordered_map<std::string, MetricsReporterFactory>;
3233

33-
/// \brief Get the set of known metrics reporter types.
34+
/// \brief Get the set of known built-in metrics reporter types.
3435
const std::unordered_set<std::string>& DefaultReporterTypes() {
3536
static const std::unordered_set<std::string> kReporterTypes = {
3637
std::string(kMetricsReporterTypeNoop),
@@ -45,48 +46,63 @@ std::string InferReporterType(
4546
if (it != properties.end() && !it->second.empty()) {
4647
return StringUtils::ToLower(it->second);
4748
}
48-
// Default to noop reporter
4949
return std::string(kMetricsReporterTypeNoop);
5050
}
5151

5252
/// \brief Metrics reporter that does nothing.
53-
///
54-
/// This is the default reporter used when no reporter is configured.
55-
/// It silently discards all reports.
5653
class NoopMetricsReporter : public MetricsReporter {
5754
public:
5855
static Result<std::unique_ptr<MetricsReporter>> Make(
5956
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
6057
return std::make_unique<NoopMetricsReporter>();
6158
}
6259

63-
void Report([[maybe_unused]] const MetricsReport& report) override {
64-
// Intentionally empty - noop implementation discards all reports
65-
}
60+
void Report([[maybe_unused]] const MetricsReport& report) override {}
6661
};
6762

68-
/// \brief Template helper to create factory functions for reporter types.
6963
template <typename T>
7064
MetricsReporterFactory MakeReporterFactory() {
7165
return [](const std::unordered_map<std::string, std::string>& props)
7266
-> Result<std::unique_ptr<MetricsReporter>> { return T::Make(props); };
7367
}
7468

75-
/// \brief Create the default registry with built-in reporters.
7669
MetricsReporterRegistry CreateDefaultRegistry() {
7770
return {
7871
{std::string(kMetricsReporterTypeNoop), MakeReporterFactory<NoopMetricsReporter>()},
7972
};
8073
}
8174

82-
/// \brief Get the global registry of metrics reporter factories.
8375
MetricsReporterRegistry& GetRegistry() {
8476
static MetricsReporterRegistry registry = CreateDefaultRegistry();
8577
return registry;
8678
}
8779

8880
} // namespace
8981

82+
// --- CompositeMetricsReporter ---
83+
84+
CompositeMetricsReporter::CompositeMetricsReporter(
85+
std::unordered_set<std::shared_ptr<MetricsReporter>> reporters)
86+
: reporters_(std::move(reporters)) {}
87+
88+
void CompositeMetricsReporter::Report(const MetricsReport& report) {
89+
for (const auto& reporter : reporters_) {
90+
try {
91+
reporter->Report(report);
92+
} catch (...) {
93+
// Catch all exceptions to ensure one failing reporter doesn't prevent others from
94+
// receiving the report.
95+
}
96+
}
97+
}
98+
99+
const std::unordered_set<std::shared_ptr<MetricsReporter>>&
100+
CompositeMetricsReporter::Reporters() const {
101+
return reporters_;
102+
}
103+
104+
// --- MetricsReporters ---
105+
90106
void MetricsReporters::Register(std::string_view reporter_type,
91107
MetricsReporterFactory factory) {
92108
GetRegistry()[StringUtils::ToLower(reporter_type)] = std::move(factory);
@@ -109,4 +125,29 @@ Result<std::unique_ptr<MetricsReporter>> MetricsReporters::Load(
109125
return it->second(properties);
110126
}
111127

128+
std::shared_ptr<MetricsReporter> MetricsReporters::Combine(
129+
std::shared_ptr<MetricsReporter> first, std::shared_ptr<MetricsReporter> second) {
130+
if (!first) return second;
131+
if (!second || first.get() == second.get()) return first;
132+
133+
// Single-pass collection: insert into the set to flatten nested composites
134+
// and deduplicate by shared_ptr identity simultaneously.
135+
std::unordered_set<std::shared_ptr<MetricsReporter>> reporters;
136+
137+
auto collect = [&reporters](const std::shared_ptr<MetricsReporter>& r) {
138+
if (auto* composite = dynamic_cast<CompositeMetricsReporter*>(r.get())) {
139+
for (const auto& inner : composite->Reporters()) {
140+
reporters.insert(inner);
141+
}
142+
} else {
143+
reporters.insert(r);
144+
}
145+
};
146+
147+
collect(first);
148+
collect(second);
149+
150+
return std::make_shared<CompositeMetricsReporter>(std::move(reporters));
151+
}
152+
112153
} // namespace iceberg

0 commit comments

Comments
 (0)