5050import org .apache .hadoop .fs .ContentSummary ;
5151import org .apache .hadoop .fs .FileStatus ;
5252import org .apache .hadoop .fs .FileSystem ;
53+ import org .apache .hadoop .fs .FileUtil ;
5354import org .apache .hadoop .fs .Path ;
5455import org .apache .hadoop .fs .permission .FsPermission ;
5556import org .apache .hadoop .hive .common .metrics .common .Metrics ;
@@ -353,11 +354,13 @@ public Stream<String> getTableNames() {
353354 private Path cacheDirPath ;
354355 private Path zeroRowsPath ;
355356 private long cacheSize = 0 ;
357+ private boolean isSafeCacheWriteEnabled ;
356358 private long maxCacheSize ;
357359 private long maxEntrySize ;
358360 private long maxEntryLifetime ;
359361 private ReadWriteLock rwLock = new ReentrantReadWriteLock ();
360362 private ScheduledFuture <?> invalidationPollFuture ;
363+ private String safeDir ;
361364
362365 private QueryResultsCache (HiveConf configuration ) throws IOException {
363366 this .conf = configuration ;
@@ -378,6 +381,8 @@ private QueryResultsCache(HiveConf configuration) throws IOException {
378381 // Results cache directory should be cleaned up at process termination.
379382 fs .deleteOnExit (cacheDirPath );
380383
384+ isSafeCacheWriteEnabled = conf .getBoolVar (HiveConf .ConfVars .HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED );
385+
381386 maxCacheSize = conf .getLongVar (HiveConf .ConfVars .HIVE_QUERY_RESULTS_CACHE_MAX_SIZE );
382387 maxEntrySize = conf .getLongVar (HiveConf .ConfVars .HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE );
383388 maxEntryLifetime = conf .getTimeVar (
@@ -415,6 +420,14 @@ public Path getCacheDirPath() {
415420 return cacheDirPath ;
416421 }
417422
423+ public void setSafeDir (String dirName ) {
424+ safeDir = dirName ;
425+ }
426+
427+ public String getSafeDir () {
428+ return safeDir ;
429+ }
430+
418431 /**
419432 * Check if the cache contains an entry for the requested LookupInfo.
420433 * @param request
@@ -513,6 +526,22 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId
513526 return addedEntry ;
514527 }
515528
529+ public void removeInvalidStaleFiles (FileSystem fs , Set <FileStatus > files ) {
530+ rwLock .writeLock ().lock ();
531+ try {
532+ for (FileStatus f : files ) {
533+ try {
534+ fs .delete (f .getPath (), true );
535+ } catch (IOException e ) {
536+ LOG .warn ("Failed to clean up stale invalid file: {}" ,
537+ f .getPath (), e );
538+ }
539+ }
540+ } finally {
541+ rwLock .writeLock ().unlock ();
542+ }
543+ }
544+
516545 /**
517546 * Updates a pending cache entry with a FetchWork result from a finished query.
518547 * If successful the cache entry will be set to valid status and be usable for cached queries.
@@ -549,6 +578,41 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
549578 return false ;
550579 }
551580
581+ if (isSafeCacheWriteEnabled ) {
582+ Path resultDir = new Path (cacheDirPath , UUID .randomUUID ().toString ());
583+ FileSystem cacheFs = resultDir .getFileSystem (conf );
584+ cacheFs .mkdirs (resultDir );
585+
586+ Set <FileStatus > cacheFilesToFetch = new HashSet <>();
587+ rwLock .writeLock ().lock ();
588+ boolean succeeded = true ;
589+ try {
590+ for (FileStatus fs : fetchWork .getFilesToFetch ()) {
591+ FileSystem srcFs = fs .getPath ().getFileSystem (conf );
592+ Path srcFile = fs .getPath ();
593+ Path destFile = new Path (resultDir ,
594+ new Path (fs .getPath ().toString ().substring (safeDir .length () + 1 )));
595+ boolean isMoved =
596+ FileUtil .copy (srcFs , srcFile , cacheFs , destFile , false , conf );
597+ if (!isMoved ) {
598+ succeeded = false ;
599+ throw new IOException ("File copy failed for " + srcFile + " -> " + destFile );
600+ }
601+ cacheFilesToFetch .add (cacheFs .getFileStatus (destFile ));
602+ }
603+ } catch (IOException e ) {
604+ LOG .warn ("Failed to write cache entry to {}" , resultDir , e );
605+ } finally {
606+ rwLock .writeLock ().unlock ();
607+ if (!succeeded ) {
608+ removeInvalidStaleFiles (cacheFs , cacheFilesToFetch );
609+ return false ;
610+ }
611+ }
612+ fetchWork .setFilesToFetch (cacheFilesToFetch );
613+ fetchWork .setTblDir (new Path (resultDir , fetchWork .getTblDir ().toString ().substring (safeDir .length () + 1 )));
614+ }
615+
552616 // Synchronize on the cache entry so that no one else can invalidate this entry
553617 // while we are in the process of setting it to valid.
554618 synchronized (cacheEntry ) {
0 commit comments