Skip to content

Commit bfc3d37

Browse files
committed
corrected implementation
1 parent 7b2d61b commit bfc3d37

6 files changed

Lines changed: 135 additions & 102 deletions

File tree

common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5679,9 +5679,9 @@ public static enum ConfVars {
56795679
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
56805680
"If the query results cache is enabled. This will keep results of previously executed queries " +
56815681
"to be reused if the same query is executed again."),
5682-
HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED("hive.query.results.safe.cache.write.enabled", false,
5683-
"If the query results safe cache is enabled. This will safely write to cache directory by first evaluating " +
5684-
"the cache entry is not overspilling the the cache directory before writing it to cache directory "),
5682+
HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED("hive.query.results.cache.safe.write.enabled", false,
5683+
"Write results to the query output path, then copy them into the results cache if size limits allow. " +
5684+
"Adds overhead from that copy versus writing result files once under the cache directory."),
56855685
HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED("hive.query.results.cache.nontransactional.tables.enabled", false,
56865686
"If the query results cache is enabled for queries involving non-transactional tables." +
56875687
"Users who enable this setting should be willing to tolerate some amount of stale results in the cache."),

ql/src/java/org/apache/hadoop/hive/ql/Context.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ public class Context {
107107
// keeps track of result cache dir for the query, later cleaned up by context cleanup
108108
private Path fsResultCacheDirs = null;
109109

110+
/**
111+
* When HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED is true, the absolute path prefix under which
112+
* execution wrote fetchable result files.
113+
*/
114+
private String cacheSafeWriteSourceDir;
115+
110116
private Configuration conf;
111117
protected int pathid = 10000;
112118
private int moveTaskId = 0;
@@ -452,6 +458,7 @@ protected Context(Context ctx) {
452458
this.scratchDirPermission = ctx.scratchDirPermission;
453459
this.fsScratchDirs.putAll(ctx.fsScratchDirs);
454460
this.fsResultCacheDirs = ctx.fsResultCacheDirs;
461+
this.cacheSafeWriteSourceDir = ctx.cacheSafeWriteSourceDir;
455462
this.conf = ctx.conf;
456463
this.pathid = ctx.pathid;
457464
this.explainConfig = ctx.explainConfig;
@@ -492,6 +499,14 @@ public Path getFsResultCacheDirs() {
492499
return this.fsResultCacheDirs;
493500
}
494501

502+
public void setCacheSafeWriteSourceDir(String cacheSafeWriteSourceDir) {
503+
this.cacheSafeWriteSourceDir = cacheSafeWriteSourceDir;
504+
}
505+
506+
public String getCacheSafeWriteSourceDir() {
507+
return cacheSafeWriteSourceDir;
508+
}
509+
495510
public Map<LoadTableDesc, WriteEntity> getLoadTableOutputMap() {
496511
return loadTableOutputMap;
497512
}

ql/src/java/org/apache/hadoop/hive/ql/Executor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ private void preExecutionCacheActions() throws Exception {
227227
CacheEntry pendingCacheEntry =
228228
QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList);
229229
if (pendingCacheEntry != null) {
230+
pendingCacheEntry.setSafeSourceDir(context.getCacheSafeWriteSourceDir());
230231
// Update cacheUsage to reference the pending entry.
231232
this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry);
232233
}
@@ -436,7 +437,7 @@ private void postExecutionCacheActions() throws Exception {
436437

437438
CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
438439
boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(cacheEntry,
439-
driverContext.getPlan().getFetchTask().getWork());
440+
driverContext.getPlan().getFetchTask().getWork(), driverContext.getConf());
440441
LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry);
441442
if (savedToCache) {
442443
useFetchFromCache(driverContext.getCacheUsage().getCacheEntry());

ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java

Lines changed: 102 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ public static class CacheEntry {
195195
private Path cachedResultsPath;
196196
private Set<FileStatus> cachedResultPaths;
197197

198+
/**
199+
* Absolute path prefix for result files when using safe cache write; see
200+
* {@link org.apache.hadoop.hive.ql.Context#getCacheSafeWriteSourceDir()}.
201+
*/
202+
private String safeSourceDir;
203+
198204
// Cache administration
199205
private long size;
200206
private AtomicInteger readers = new AtomicInteger(0);
@@ -288,6 +294,14 @@ public QueryInfo getQueryInfo() {
288294
return queryInfo;
289295
}
290296

297+
public void setSafeSourceDir(String safeSourceDir) {
298+
this.safeSourceDir = safeSourceDir;
299+
}
300+
301+
public String getSafeSourceDir() {
302+
return safeSourceDir;
303+
}
304+
291305
public Path getCachedResultsPath() {
292306
return cachedResultsPath;
293307
}
@@ -354,13 +368,11 @@ public Stream<String> getTableNames() {
354368
private Path cacheDirPath;
355369
private Path zeroRowsPath;
356370
private long cacheSize = 0;
357-
private boolean isSafeCacheWriteEnabled;
358371
private long maxCacheSize;
359372
private long maxEntrySize;
360373
private long maxEntryLifetime;
361374
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
362375
private ScheduledFuture<?> invalidationPollFuture;
363-
private String safeDir;
364376

365377
private QueryResultsCache(HiveConf configuration) throws IOException {
366378
this.conf = configuration;
@@ -381,8 +393,6 @@ private QueryResultsCache(HiveConf configuration) throws IOException {
381393
// Results cache directory should be cleaned up at process termination.
382394
fs.deleteOnExit(cacheDirPath);
383395

384-
isSafeCacheWriteEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED);
385-
386396
maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE);
387397
maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE);
388398
maxEntryLifetime = conf.getTimeVar(
@@ -420,12 +430,17 @@ public Path getCacheDirPath() {
420430
return cacheDirPath;
421431
}
422432

423-
public void setSafeDir(String dirName) {
424-
safeDir = dirName;
425-
}
426-
427-
public String getSafeDir() {
428-
return safeDir;
433+
/**
434+
* Runs {@code action} while holding {@link #rwLock} in exclusive (write) mode.
435+
*/
436+
private void withWriteLock(Runnable action) {
437+
Lock writeLock = rwLock.writeLock();
438+
try {
439+
writeLock.lock();
440+
action.run();
441+
} finally {
442+
writeLock.unlock();
443+
}
429444
}
430445

431446
/**
@@ -507,10 +522,7 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId
507522
addedEntry.queryInfo = queryInfo;
508523
addedEntry.txnWriteIdList = txnWriteIdList;
509524

510-
Lock writeLock = rwLock.writeLock();
511-
try {
512-
writeLock.lock();
513-
525+
withWriteLock(() -> {
514526
LOG.info("Adding placeholder cache entry for query '{}'", queryText);
515527

516528
// Add the entry to the cache structures while under write lock.
@@ -519,16 +531,13 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId
519531
// Index of entries by table usage.
520532
addedEntry.getTableNames()
521533
.forEach(tableName -> addToEntryMap(tableToEntryMap, tableName, addedEntry));
522-
} finally {
523-
writeLock.unlock();
524-
}
534+
});
525535

526536
return addedEntry;
527537
}
528538

529539
public void removeInvalidStaleFiles(FileSystem fs, Set<FileStatus> files) {
530-
rwLock.writeLock().lock();
531-
try {
540+
withWriteLock(() -> {
532541
for (FileStatus f : files) {
533542
try {
534543
fs.delete(f.getPath(), true);
@@ -537,9 +546,7 @@ public void removeInvalidStaleFiles(FileSystem fs, Set<FileStatus> files) {
537546
f.getPath(), e);
538547
}
539548
}
540-
} finally {
541-
rwLock.writeLock().unlock();
542-
}
549+
});
543550
}
544551

545552
/**
@@ -549,9 +556,11 @@ public void removeInvalidStaleFiles(FileSystem fs, Set<FileStatus> files) {
549556
* CacheEntry.releaseReader() should be called when the caller is done with the cache entry.
550557
* @param cacheEntry
551558
* @param fetchWork
559+
* @param queryConf session (or query) Hive configuration; used for safe-cache-write and filesystem access
560+
* so per-session {@code SET hive.query.results.cache.safe.write.enabled} is honored
552561
* @return
553562
*/
554-
public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
563+
public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork, HiveConf queryConf) {
555564
Path queryResultsPath = null;
556565
Path cachedResultsPath = null;
557566

@@ -561,7 +570,7 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
561570

562571
boolean requiresCaching = true;
563572
queryResultsPath = fetchWork.getTblDir();
564-
FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
573+
FileSystem resultsFs = queryResultsPath.getFileSystem(queryConf);
565574

566575
long resultSize = 0;
567576
for(FileStatus fs:fetchWork.getFilesToFetch()) {
@@ -578,37 +587,8 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
578587
return false;
579588
}
580589

581-
if (isSafeCacheWriteEnabled) {
582-
Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString());
583-
FileSystem cacheFs = resultDir.getFileSystem(conf);
584-
cacheFs.mkdirs(resultDir);
585-
586-
Set<FileStatus> cacheFilesToFetch = new HashSet<>();
587-
rwLock.writeLock().lock();
588-
boolean succeeded = true;
589-
try {
590-
for (FileStatus fs : fetchWork.getFilesToFetch()) {
591-
FileSystem srcFs = fs.getPath().getFileSystem(conf);
592-
Path srcFile = fs.getPath();
593-
Path destFile = new Path(resultDir,
594-
new Path(fs.getPath().toString().substring(safeDir.length() + 1)));
595-
succeeded = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, conf);
596-
if (!succeeded) {
597-
throw new IOException("File copy failed for " + srcFile + " -> " + destFile);
598-
}
599-
cacheFilesToFetch.add(cacheFs.getFileStatus(destFile));
600-
}
601-
} catch (IOException e) {
602-
LOG.warn("Failed to write cache entry to {}", resultDir, e);
603-
} finally {
604-
rwLock.writeLock().unlock();
605-
}
606-
if (!succeeded) {
607-
removeInvalidStaleFiles(cacheFs, cacheFilesToFetch);
608-
return false;
609-
}
610-
fetchWork.setFilesToFetch(cacheFilesToFetch);
611-
fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDir.length() + 1)));
590+
if (!rewriteFetchWorkForSafeCacheWrite(cacheEntry, fetchWork, queryConf)) {
591+
return false;
612592
}
613593

614594
// Synchronize on the cache entry so that no one else can invalidate this entry
@@ -663,10 +643,68 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
663643
return true;
664644
}
665645

646+
private boolean rewriteFetchWorkForSafeCacheWrite(CacheEntry cacheEntry, FetchWork fetchWork, HiveConf queryConf)
647+
throws IOException {
648+
if (!queryConf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) {
649+
return true;
650+
}
651+
String safeDir = cacheEntry.getSafeSourceDir();
652+
if (safeDir == null) {
653+
LOG.error("Safe cache write enabled but cache entry has no safe source dir; query: {}",
654+
cacheEntry.getQueryInfo().getLookupInfo().getQueryText());
655+
return false;
656+
}
657+
final int safeDirAndSepLen = safeDir.length() + Path.SEPARATOR.length();
658+
Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString());
659+
FileSystem cacheFs = resultDir.getFileSystem(queryConf);
660+
cacheFs.mkdirs(resultDir);
661+
662+
Set<FileStatus> cacheFilesToFetch = new HashSet<>();
663+
boolean succeeded =
664+
copyFetchWorkFilesIntoCacheDirUnderWriteLock(fetchWork, resultDir, cacheFs, safeDirAndSepLen,
665+
queryConf, cacheFilesToFetch);
666+
if (!succeeded) {
667+
removeInvalidStaleFiles(cacheFs, cacheFilesToFetch);
668+
return false;
669+
}
670+
fetchWork.setFilesToFetch(cacheFilesToFetch);
671+
fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDirAndSepLen)));
672+
return true;
673+
}
674+
675+
/**
676+
* Copies each file from the fetch work into {@code resultDir}, preserving relative paths below
677+
* the safe-write prefix. Runs under {@link #rwLock} write lock so cache metadata and filesystem
678+
* layout stay consistent with other cache mutations.
679+
*
680+
* @param destFileStatuses output: destination {@link FileStatus} entries for successfully copied files
681+
* @return {@code true} if every file was copied and listed; {@code false} otherwise
682+
*/
683+
private boolean copyFetchWorkFilesIntoCacheDirUnderWriteLock(FetchWork fetchWork, Path resultDir,
684+
FileSystem cacheFs, int safeDirAndSepLen, HiveConf queryConf, Set<FileStatus> destFileStatuses) {
685+
final boolean[] succeeded = { true };
686+
withWriteLock(() -> {
687+
try {
688+
for (FileStatus fs : fetchWork.getFilesToFetch()) {
689+
FileSystem srcFs = fs.getPath().getFileSystem(queryConf);
690+
Path srcFile = fs.getPath();
691+
Path destFile = new Path(resultDir,
692+
new Path(fs.getPath().toString().substring(safeDirAndSepLen)));
693+
succeeded[0] = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, queryConf);
694+
if (!succeeded[0]) {
695+
throw new IOException("File copy failed for " + srcFile + " -> " + destFile);
696+
}
697+
destFileStatuses.add(cacheFs.getFileStatus(destFile));
698+
}
699+
} catch (IOException e) {
700+
LOG.warn("Failed to write cache entry to {}", resultDir, e);
701+
}
702+
});
703+
return succeeded[0];
704+
}
705+
666706
public void clear() {
667-
Lock writeLock = rwLock.writeLock();
668-
try {
669-
writeLock.lock();
707+
withWriteLock(() -> {
670708
LOG.info("Clearing the results cache");
671709
CacheEntry[] allEntries = null;
672710
synchronized (lru) {
@@ -679,9 +717,7 @@ public void clear() {
679717
LOG.error("Error removing cache entry " + entry, err);
680718
}
681719
}
682-
} finally {
683-
writeLock.unlock();
684-
}
720+
});
685721
}
686722

687723
public long getSize() {
@@ -697,27 +733,21 @@ public long getSize() {
697733
public void notifyTableChanged(String dbName, String tableName, long updateTime) {
698734
LOG.debug("Table changed: {}.{}, at {}", dbName, tableName, updateTime);
699735
// Invalidate all cache entries using this table.
700-
List<CacheEntry> entriesToInvalidate = null;
701-
rwLock.writeLock().lock();
702-
try {
736+
withWriteLock(() -> {
703737
String key = (dbName.toLowerCase() + "." + tableName.toLowerCase());
704738
Set<CacheEntry> entriesForTable = tableToEntryMap.get(key);
705739
if (entriesForTable != null) {
706740
// Possible concurrent modification issues if we try to remove cache entries while
707741
// traversing the cache structures. Save the entries to remove in a separate list.
708-
entriesToInvalidate = new ArrayList<>(entriesForTable);
709-
}
710-
if (entriesToInvalidate != null) {
742+
List<CacheEntry> entriesToInvalidate = new ArrayList<>(entriesForTable);
711743
for (CacheEntry entry : entriesToInvalidate) {
712744
// Ignore updates that occured before this cached query was created.
713745
if (entry.getQueryInfo().getQueryTime() <= updateTime) {
714746
removeEntry(entry);
715747
}
716748
}
717749
}
718-
} finally {
719-
rwLock.writeLock().unlock();
720-
}
750+
});
721751
}
722752

723753
private static final int INITIAL_LRU_SIZE = 16;
@@ -799,15 +829,12 @@ private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry, Set<CacheE
799829

800830
public void removeEntry(CacheEntry entry) {
801831
entry.invalidate();
802-
rwLock.writeLock().lock();
803-
try {
832+
withWriteLock(() -> {
804833
removeFromLookup(entry);
805834
lru.remove(entry);
806835
// Should the cache size be updated here, or after the result data has actually been deleted?
807836
cacheSize -= entry.size;
808-
} finally {
809-
rwLock.writeLock().unlock();
810-
}
837+
});
811838
}
812839

813840
private void removeFromLookup(CacheEntry entry) {

0 commit comments

Comments
 (0)