Skip to content

Commit 4b1140b

Browse files
committed
Make unpack methods async for deterministic progress delivery
1 parent c8f6875 commit 4b1140b

6 files changed

Lines changed: 90 additions & 70 deletions

File tree

Sources/Containerization/Image/Unpacker/EXT4Unpacker.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,18 @@ public struct EXT4Unpacker: Unpacker {
4242
archive: URL,
4343
compression: ContainerizationArchive.Filter,
4444
at path: URL
45-
) throws {
45+
) async throws {
4646
let cleanedPath = try prepareUnpackPath(path: path)
4747
let filesystem = try EXT4.Formatter(
4848
FilePath(cleanedPath),
4949
minDiskSize: blockSizeInBytes
5050
)
5151
defer { try? filesystem.close() }
5252

53-
try filesystem.unpack(
53+
try await filesystem.unpack(
5454
source: archive,
5555
format: .paxRestricted,
56-
compression: compression,
57-
progress: nil
56+
compression: compression
5857
)
5958
}
6059
#endif
@@ -120,7 +119,7 @@ public struct EXT4Unpacker: Unpacker {
120119
filter: resolved.filter,
121120
file: resolved.file
122121
)
123-
try filesystem.unpack(reader: reader, progress: progress)
122+
try await filesystem.unpack(reader: reader, progress: progress)
124123
}
125124

126125
return .block(

Sources/ContainerizationEXT4/Formatter+Unpack.swift

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,26 @@ private typealias Hardlinks = [FilePath: FilePath]
2525

2626
extension EXT4.Formatter {
2727
/// Unpack the provided archive on to the ext4 filesystem.
28-
public func unpack(reader: ArchiveReader, progress: ProgressHandler? = nil) throws {
28+
public func unpack(reader: ArchiveReader, progress: ProgressHandler? = nil) async throws {
29+
try await self.unpackEntries(reader: reader, progress: progress)
30+
}
31+
32+
/// Unpack an archive at the source URL on to the ext4 filesystem.
33+
public func unpack(
34+
source: URL,
35+
format: ContainerizationArchive.Format = .paxRestricted,
36+
compression: ContainerizationArchive.Filter = .gzip
37+
) async throws {
38+
let reader = try ArchiveReader(
39+
format: format,
40+
filter: compression,
41+
file: source
42+
)
43+
try await self.unpack(reader: reader)
44+
}
45+
46+
/// Core unpack logic. When `progress` is nil the handler calls are skipped.
47+
private func unpackEntries(reader: ArchiveReader, progress: ProgressHandler?) async throws {
2948
var hardlinks: Hardlinks = [:]
3049
// Allocate a single 128KiB reusable buffer for all files to minimize allocations
3150
// and reduce the number of read calls to libarchive.
@@ -39,35 +58,33 @@ extension EXT4.Formatter {
3958
continue
4059
}
4160

42-
defer {
43-
// Count the number of entries
44-
if let progress {
45-
Task {
46-
await progress([
47-
ProgressEvent(event: "add-items", value: 1)
48-
])
49-
}
50-
}
51-
}
52-
5361
pathEntry = preProcessPath(s: pathEntry)
5462
let path = FilePath(pathEntry)
5563

5664
if path.base.hasPrefix(".wh.") {
5765
if path.base == ".wh..wh..opq" { // whiteout directory
5866
try self.unlink(path: path.dir, directoryWhiteout: true)
67+
if let progress {
68+
await progress([ProgressEvent(event: "add-items", value: 1)])
69+
}
5970
continue
6071
}
6172
let startIndex = path.base.index(path.base.startIndex, offsetBy: ".wh.".count)
6273
let filePath = String(path.base[startIndex...])
6374
let dir: FilePath = path.dir
6475
try self.unlink(path: dir.join(filePath))
76+
if let progress {
77+
await progress([ProgressEvent(event: "add-items", value: 1)])
78+
}
6579
continue
6680
}
6781

6882
if let hardlink = entry.hardlink {
6983
let hl = preProcessPath(s: hardlink)
7084
hardlinks[path] = FilePath(hl)
85+
if let progress {
86+
await progress([ProgressEvent(event: "add-items", value: 1)])
87+
}
7188
continue
7289
}
7390
let ts = FileTimestamps(
@@ -84,13 +101,8 @@ extension EXT4.Formatter {
84101
uid: entry.owner,
85102
gid: entry.group, xattrs: entry.xattrs, fileBuffer: reusableBuffer)
86103

87-
// Count the size of files
88104
if let progress, let size = entry.size {
89-
Task {
90-
await progress([
91-
ProgressEvent(event: "add-size", value: Int64(size))
92-
])
93-
}
105+
await progress([ProgressEvent(event: "add-size", value: Int64(size))])
94106
}
95107
case .symbolicLink:
96108
var symlinkTarget: FilePath?
@@ -104,6 +116,10 @@ extension EXT4.Formatter {
104116
default:
105117
continue
106118
}
119+
120+
if let progress {
121+
await progress([ProgressEvent(event: "add-items", value: 1)])
122+
}
107123
}
108124
guard hardlinks.acyclic else {
109125
throw UnpackError.circularLinks
@@ -115,13 +131,13 @@ extension EXT4.Formatter {
115131
}
116132
}
117133

118-
/// Unpack an archive at the source URL on to the ext4 filesystem.
134+
/// Unpack an archive at the source URL on to the ext4 filesystem with progress reporting.
119135
public func unpack(
120136
source: URL,
121137
format: ContainerizationArchive.Format = .paxRestricted,
122138
compression: ContainerizationArchive.Filter = .gzip,
123-
progress: ProgressHandler? = nil
124-
) throws {
139+
progress: @escaping ProgressHandler
140+
) async throws {
125141
// For zstd, decompress once and reuse for both passes to avoid double decompression.
126142
let fileToRead: URL
127143
let readerFilter: ContainerizationArchive.Filter
@@ -141,34 +157,30 @@ extension EXT4.Formatter {
141157
}
142158
}
143159

144-
// Optional first pass: scan headers to get total size (fast, metadata only)
145-
if let progress {
146-
let sizeReader = try ArchiveReader(
147-
format: format,
148-
filter: readerFilter,
149-
file: fileToRead
150-
)
151-
var totalSize: Int64 = 0
152-
for (entry, _) in sizeReader.makeStreamingIterator() {
153-
try Task.checkCancellation()
154-
if entry.fileType == .regular, let size = entry.size {
155-
totalSize += Int64(size)
156-
}
157-
}
158-
if totalSize > 0 {
159-
Task {
160-
await progress([ProgressEvent(event: "add-total-size", value: totalSize)])
161-
}
160+
// First pass: scan headers to get total size (fast, metadata only)
161+
let sizeReader = try ArchiveReader(
162+
format: format,
163+
filter: readerFilter,
164+
file: fileToRead
165+
)
166+
var totalSize: Int64 = 0
167+
for (entry, _) in sizeReader.makeStreamingIterator() {
168+
try Task.checkCancellation()
169+
if entry.fileType == .regular, let size = entry.size {
170+
totalSize += Int64(size)
162171
}
163172
}
173+
if totalSize > 0 {
174+
await progress([ProgressEvent(event: "add-total-size", value: totalSize)])
175+
}
164176

165177
// Second pass: unpack
166178
let reader = try ArchiveReader(
167179
format: format,
168180
filter: readerFilter,
169181
file: fileToRead
170182
)
171-
try self.unpack(reader: reader, progress: progress)
183+
try await self.unpack(reader: reader, progress: progress)
172184
}
173185

174186
private func preProcessPath(s: String) -> String {

Sources/cctl/RootfsCommand.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ extension Application {
9696

9797
private func outputExt4(archive: URL, to path: URL) async throws {
9898
let unpacker = EXT4Unpacker(blockSizeInBytes: 256.mib())
99-
try unpacker.unpack(archive: archive, compression: .gzip, at: path)
99+
try await unpacker.unpack(archive: archive, compression: .gzip, at: path)
100100
}
101101

102102
private func outputImage(path: URL, reference: String) async throws {

Tests/ContainerizationEXT4Tests/TestEXT4Unpacker.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct Ext4UnpackerTests {
2828
.appendingPathComponent("ext4.unpacked.oci.img.delme", isDirectory: false))
2929

3030
final class MockEXT4Unpacker {
31-
static func Unpack(index: String, fsPath: FilePath) throws {
31+
static func Unpack(index: String, fsPath: FilePath) async throws {
3232
let fs = try EXT4.Formatter(fsPath)
3333
let bundle = Bundle.module
3434
guard let indexPath = bundle.url(forResource: index, withExtension: nil) else {
@@ -67,14 +67,14 @@ struct Ext4UnpackerTests {
6767
guard let layerPath = bundle.url(forResource: layerDigest, withExtension: nil) else {
6868
throw NSError(domain: "layer \(layerDigest) not found", code: 1)
6969
}
70-
try fs.unpack(source: layerPath)
70+
try await fs.unpack(source: layerPath)
7171
}
7272
try fs.close()
7373
}
7474
}
7575

76-
@Test func eXT4Unpacker() throws {
77-
try MockEXT4Unpacker.Unpack(index: self.indexSHA, fsPath: self.fsPath)
76+
@Test func eXT4Unpacker() async throws {
77+
try await MockEXT4Unpacker.Unpack(index: self.indexSHA, fsPath: self.fsPath)
7878
let ext4 = try EXT4.EXT4Reader(blockDevice: self.fsPath)
7979
let children = try ext4.children(of: EXT4.RootInode)
8080
#expect(

Tests/ContainerizationEXT4Tests/TestFormatterUnpack.swift

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,19 @@ struct Tar2EXT4Test: ~Copyable {
4242
"extendedattribute.test": Data([15, 26, 54, 1, 2, 4, 6, 7, 7]),
4343
]
4444

45+
let layer1Path: URL
46+
let layer2Path: URL
47+
4548
init() throws {
46-
// create layer1
49+
// Compute paths before any throwing code to satisfy ~Copyable initialization rules.
4750
let layer1Path = FileManager.default.uniqueTemporaryDirectory()
4851
.appendingPathComponent("layer1.tar.gz", isDirectory: false)
52+
let layer2Path = FileManager.default.uniqueTemporaryDirectory()
53+
.appendingPathComponent("layer2.tar.gz", isDirectory: false)
54+
self.layer1Path = layer1Path
55+
self.layer2Path = layer2Path
56+
57+
// create layer1
4958
let layer1Archiver = try ArchiveWriter(
5059
configuration: ArchiveWriterConfiguration(format: .paxRestricted, filter: .gzip))
5160
try layer1Archiver.open(file: layer1Path)
@@ -57,8 +66,6 @@ struct Tar2EXT4Test: ~Copyable {
5766
try layer1Archiver.finishEncoding()
5867

5968
// create layer2
60-
let layer2Path = FileManager.default.uniqueTemporaryDirectory()
61-
.appendingPathComponent("layer2.tar.gz", isDirectory: false)
6269
let layer2Archiver = try ArchiveWriter(
6370
configuration: ArchiveWriterConfiguration(format: .paxRestricted, filter: .gzip))
6471
try layer2Archiver.open(file: layer2Path)
@@ -82,18 +89,23 @@ struct Tar2EXT4Test: ~Copyable {
8289
// a new layer overwriting over an existing layer
8390
try layer2Archiver.writeEntry(entry: WriteEntry.file(path: "/dir2/file1", permissions: 0o644), data: nil)
8491
try layer2Archiver.finishEncoding()
92+
}
8593

86-
let unpacker = try EXT4.Formatter(fsPath)
87-
try unpacker.unpack(source: layer1Path)
88-
try unpacker.unpack(source: layer2Path)
89-
try unpacker.close()
94+
private func unpackLayers() async throws {
95+
let formatter = try EXT4.Formatter(fsPath)
96+
try await formatter.unpack(source: layer1Path)
97+
try await formatter.unpack(source: layer2Path)
98+
try formatter.close()
9099
}
91100

92101
deinit {
93102
try? FileManager.default.removeItem(at: fsPath.url)
103+
try? FileManager.default.removeItem(at: layer1Path.deletingLastPathComponent())
104+
try? FileManager.default.removeItem(at: layer2Path.deletingLastPathComponent())
94105
}
95106

96-
@Test func testUnpackBasic() throws {
107+
@Test func testUnpackBasic() async throws {
108+
try await unpackLayers()
97109
let ext4 = try EXT4.EXT4Reader(blockDevice: fsPath)
98110
// just a directory
99111
let dir1Inode = try ext4.getInode(number: 12)
@@ -196,12 +208,9 @@ struct UnpackProgressTest {
196208

197209
// Unpack with progress tracking
198210
let formatter = try EXT4.Formatter(fsPath)
199-
try formatter.unpack(source: archivePath, progress: progressHandler)
211+
try await formatter.unpack(source: archivePath, progress: progressHandler)
200212
try formatter.close()
201213

202-
// Allow async progress tasks to complete
203-
try await Task.sleep(for: .milliseconds(100))
204-
205214
// Analyze collected events
206215
let allEvents = await collector.allEvents()
207216

@@ -292,7 +301,7 @@ struct UnpackProgressTest {
292301
}
293302
}
294303

295-
@Test func progressHandlerIsOptional() throws {
304+
@Test func progressHandlerIsOptional() async throws {
296305
// Verify that unpacking works without a progress handler (existing behavior)
297306
let tempDir = FileManager.default.uniqueTemporaryDirectory()
298307
let archivePath = tempDir.appendingPathComponent("test.tar.gz", isDirectory: false)
@@ -314,7 +323,7 @@ struct UnpackProgressTest {
314323

315324
// Unpack without progress handler - should not throw
316325
let formatter = try EXT4.Formatter(fsPath)
317-
try formatter.unpack(source: archivePath)
326+
try await formatter.unpack(source: archivePath)
318327
try formatter.close()
319328

320329
// Verify the file was unpacked correctly

Tests/ContainerizationTests/ImageTests/HeaderScanTimingTests.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ struct ImageHeaderScanTimingTest {
6969
let compression = compressionFilter(for: layer.mediaType)
7070
let compressedSize = try FileManager.default.attributesOfItem(atPath: content.path.path)[.size] as? Int64 ?? 0
7171
let label = "\(image.label) layer \(i + 1)/\(manifest.layers.count) (\(layer.mediaType), \(formatBytes(compressedSize)) compressed)"
72-
try measureOverhead(url: content.path, compression: compression, label: label)
72+
try await measureOverhead(url: content.path, compression: compression, label: label)
7373
}
7474
}
7575
}
@@ -87,7 +87,7 @@ struct ImageHeaderScanTimingTest {
8787
}
8888
}
8989

90-
private func measureOverhead(url: URL, compression: ContainerizationArchive.Filter, label: String) throws {
90+
private func measureOverhead(url: URL, compression: ContainerizationArchive.Filter, label: String) async throws {
9191
let clock = ContinuousClock()
9292

9393
print("\n--- \(label) ---\n")
@@ -133,9 +133,9 @@ struct ImageHeaderScanTimingTest {
133133
let fsPath1 = FilePath(tempDir1.appendingPathComponent("no-progress.ext4.img", isDirectory: false))
134134
defer { try? FileManager.default.removeItem(at: tempDir1) }
135135

136-
let unpackOnlyDuration = try clock.measure {
136+
let unpackOnlyDuration = try await clock.measure {
137137
let formatter = try EXT4.Formatter(fsPath1)
138-
try formatter.unpack(source: url, compression: compression)
138+
try await formatter.unpack(source: url, compression: compression)
139139
try formatter.close()
140140
}
141141
print(" Unpack (no progress): \(unpackOnlyDuration)")
@@ -146,9 +146,9 @@ struct ImageHeaderScanTimingTest {
146146
defer { try? FileManager.default.removeItem(at: tempDir2) }
147147

148148
let noopProgress: ProgressHandler = { _ in }
149-
let withProgressDuration = try clock.measure {
149+
let withProgressDuration = try await clock.measure {
150150
let formatter = try EXT4.Formatter(fsPath2)
151-
try formatter.unpack(source: url, compression: compression, progress: noopProgress)
151+
try await formatter.unpack(source: url, compression: compression, progress: noopProgress)
152152
try formatter.close()
153153
}
154154
print(" Unpack (w/ progress): \(withProgressDuration)")

0 commit comments

Comments
 (0)