Skip to content

Commit 439a314

Browse files
committed
refractored test
1 parent a1b2d57 commit 439a314

4 files changed

Lines changed: 263 additions & 7 deletions

File tree

common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5679,7 +5679,9 @@ public static enum ConfVars {
56795679
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
56805680
"If the query results cache is enabled. This will keep results of previously executed queries " +
56815681
"to be reused if the same query is executed again."),
5682-
5682+
HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED("hive.query.results.safe.cache.write.enabled", false,
5683+
"If the query results safe cache is enabled. This will safely write to cache directory by first evaluating " +
5684+
"the cache entry is not overspilling the the cache directory before writing it to cache directory "),
56835685
HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED("hive.query.results.cache.nontransactional.tables.enabled", false,
56845686
"If the query results cache is enabled for queries involving non-transactional tables." +
56855687
"Users who enable this setting should be willing to tolerate some amount of stale results in the cache."),

ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.fs.ContentSummary;
5151
import org.apache.hadoop.fs.FileStatus;
5252
import org.apache.hadoop.fs.FileSystem;
53+
import org.apache.hadoop.fs.FileUtil;
5354
import org.apache.hadoop.fs.Path;
5455
import org.apache.hadoop.fs.permission.FsPermission;
5556
import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -353,11 +354,13 @@ public Stream<String> getTableNames() {
353354
private Path cacheDirPath;
354355
private Path zeroRowsPath;
355356
private long cacheSize = 0;
357+
private boolean isSafeCacheWriteEnabled;
356358
private long maxCacheSize;
357359
private long maxEntrySize;
358360
private long maxEntryLifetime;
359361
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
360362
private ScheduledFuture<?> invalidationPollFuture;
363+
private String safeDir;
361364

362365
private QueryResultsCache(HiveConf configuration) throws IOException {
363366
this.conf = configuration;
@@ -378,6 +381,8 @@ private QueryResultsCache(HiveConf configuration) throws IOException {
378381
// Results cache directory should be cleaned up at process termination.
379382
fs.deleteOnExit(cacheDirPath);
380383

384+
isSafeCacheWriteEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED);
385+
381386
maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE);
382387
maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE);
383388
maxEntryLifetime = conf.getTimeVar(
@@ -415,6 +420,14 @@ public Path getCacheDirPath() {
415420
return cacheDirPath;
416421
}
417422

423+
public void setSafeDir(String dirName) {
424+
safeDir = dirName;
425+
}
426+
427+
public String getSafeDir() {
428+
return safeDir;
429+
}
430+
418431
/**
419432
* Check if the cache contains an entry for the requested LookupInfo.
420433
* @param request
@@ -513,6 +526,22 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId
513526
return addedEntry;
514527
}
515528

529+
public void removeInvalidStaleFiles(FileSystem fs, Set<FileStatus> files) {
530+
rwLock.writeLock().lock();
531+
try {
532+
for (FileStatus f : files) {
533+
try {
534+
fs.delete(f.getPath(), true);
535+
} catch (IOException e) {
536+
LOG.warn("Failed to clean up stale invalid file: {}",
537+
f.getPath(), e);
538+
}
539+
}
540+
} finally {
541+
rwLock.writeLock().unlock();
542+
}
543+
}
544+
516545
/**
517546
* Updates a pending cache entry with a FetchWork result from a finished query.
518547
* If successful the cache entry will be set to valid status and be usable for cached queries.
@@ -549,6 +578,39 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
549578
return false;
550579
}
551580

581+
if (isSafeCacheWriteEnabled) {
582+
Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString());
583+
FileSystem cacheFs = resultDir.getFileSystem(conf);
584+
cacheFs.mkdirs(resultDir);
585+
586+
Set<FileStatus> cacheFilesToFetch = new HashSet<>();
587+
rwLock.writeLock().lock();
588+
boolean succeeded = true;
589+
try {
590+
for (FileStatus fs : fetchWork.getFilesToFetch()) {
591+
FileSystem srcFs = fs.getPath().getFileSystem(conf);
592+
Path srcFile = fs.getPath();
593+
Path destFile = new Path(resultDir,
594+
new Path(fs.getPath().toString().substring(safeDir.length() + 1)));
595+
succeeded = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, conf);
596+
if (!succeeded) {
597+
throw new IOException("File copy failed for " + srcFile + " -> " + destFile);
598+
}
599+
cacheFilesToFetch.add(cacheFs.getFileStatus(destFile));
600+
}
601+
} catch (IOException e) {
602+
LOG.warn("Failed to write cache entry to {}", resultDir, e);
603+
} finally {
604+
rwLock.writeLock().unlock();
605+
}
606+
if (!succeeded) {
607+
removeInvalidStaleFiles(cacheFs, cacheFilesToFetch);
608+
return false;
609+
}
610+
fetchWork.setFilesToFetch(cacheFilesToFetch);
611+
fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDir.length() + 1)));
612+
}
613+
552614
// Synchronize on the cache entry so that no one else can invalidate this entry
553615
// while we are in the process of setting it to valid.
554616
synchronized (cacheEntry) {

ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7500,19 +7500,24 @@ protected Table getTargetTable(QB qb, String dest) throws SemanticException {
75007500
}
75017501

75027502
private Path getDestinationFilePath(QB qb, final String destinationFile, boolean isMmTable) {
7503+
Path defaultPath = new Path(destinationFile);
75037504
if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache(qb)) {
75047505
assert (!isMmTable);
75057506
QueryResultsCache instance = QueryResultsCache.getInstance();
75067507
// QueryResultsCache should have been initialized by now
75077508
if (instance != null) {
7508-
Path resultCacheTopDir = instance.getCacheDirPath();
7509-
String dirName = UUID.randomUUID().toString();
7510-
Path resultDir = new Path(resultCacheTopDir, dirName);
7511-
this.ctx.setFsResultCacheDirs(resultDir);
7512-
return resultDir;
7509+
if (!conf.getBoolVar(ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) {
7510+
Path resultCacheTopDir = instance.getCacheDirPath();
7511+
String dirName = UUID.randomUUID().toString();
7512+
Path resultDir = new Path(resultCacheTopDir, dirName);
7513+
this.ctx.setFsResultCacheDirs(resultDir);
7514+
return resultDir;
7515+
} else {
7516+
instance.setSafeDir(defaultPath.toString());
7517+
}
75137518
}
75147519
}
7515-
return new Path(destinationFile);
7520+
return defaultPath;
75167521
}
75177522

75187523
@SuppressWarnings("nls")
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.ql;
20+
21+
import java.io.IOException;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.apache.hadoop.fs.FileSystem;
27+
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.hive.conf.HiveConf;
29+
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
30+
import org.apache.hadoop.hive.ql.session.SessionState;
31+
import org.apache.hive.testutils.HiveTestEnvSetup;
32+
33+
import org.junit.After;
34+
import org.junit.AfterClass;
35+
import org.junit.Assert;
36+
import org.junit.Before;
37+
import org.junit.BeforeClass;
38+
import org.junit.ClassRule;
39+
import org.junit.Rule;
40+
import org.junit.Test;
41+
import org.junit.rules.TestRule;
42+
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
public class TestCachedResults {
47+
48+
private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class);
49+
private static final long MAX_ALLOWED_CACHE_SIZE = 1_000_000L;
50+
51+
private static final String Q_WINDOW =
52+
"SELECT t1.id, "
53+
+ "SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, "
54+
+ "AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, "
55+
+ "COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt "
56+
+ "FROM tab t1 "
57+
+ "JOIN tab t2 ON t1.id % 10 = t2.id % 10 "
58+
+ "WHERE t1.id <= 300";
59+
60+
private static final String Q_JOIN =
61+
"SELECT base.id, base.bucket, agg.bucket_avg "
62+
+ "FROM ( SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 ) base "
63+
+ "JOIN ( "
64+
+ " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt "
65+
+ " FROM tab GROUP BY id % 10 "
66+
+ ") agg ON base.bucket = agg.bucket "
67+
+ "ORDER BY base.id";
68+
69+
private static final String Q_CTE =
70+
"WITH base AS ( "
71+
+ " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 FROM tab "
72+
+ "), joined AS ( "
73+
+ " SELECT a.id AS a_id, b.id AS b_id, a.mod5, a.mod10, (a.id * b.id) AS product "
74+
+ " FROM base a JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even "
75+
+ " WHERE a.id <= 200 AND b.id <= 200 "
76+
+ ") "
77+
+ "SELECT mod5, mod10, COUNT(*) AS cnt, SUM(product) AS total_product, "
78+
+ "MAX(product) AS max_product, MIN(a_id) AS min_a "
79+
+ "FROM joined GROUP BY mod5, mod10 ORDER BY mod5, mod10";
80+
81+
@ClassRule
82+
public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup();
83+
84+
private static HiveConf conf;
85+
86+
private ScheduledExecutorService scheduler;
87+
private volatile long maxCacheSize;
88+
89+
@BeforeClass
90+
public static void setUp() throws Exception {
91+
conf = envSetup.getTestCtx().hiveConf;
92+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true);
93+
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache");
94+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED, true);
95+
HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE, MAX_ALLOWED_CACHE_SIZE);
96+
LOG.info("max allowed cache size: {}", conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE));
97+
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
98+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
99+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
100+
SessionState.start(conf);
101+
createAndPopulateTables();
102+
}
103+
104+
private static void createAndPopulateTables() throws Exception {
105+
IDriver driver = DriverFactory.newDriver(conf);
106+
driver.run("DROP TABLE IF EXISTS tab");
107+
driver.run("CREATE TABLE tab (id INT)");
108+
driver.run(
109+
"INSERT INTO TABLE tab SELECT pos + 1 AS id FROM ( "
110+
+ "SELECT posexplode(split(space(999), ' ')) AS (pos, val) ) t");
111+
}
112+
113+
@AfterClass
114+
public static void afterClass() throws Exception {
115+
DriverFactory.newDriver(conf).run("DROP TABLE IF EXISTS tab");
116+
}
117+
118+
@Before
119+
public void beforeEach() {
120+
scheduler = Executors.newSingleThreadScheduledExecutor();
121+
maxCacheSize = 0;
122+
}
123+
124+
@After
125+
public void afterEach() throws Exception {
126+
QueryResultsCache.cleanupInstance();
127+
Path cachePath = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY));
128+
try {
129+
FileSystem fs = cachePath.getFileSystem(conf);
130+
fs.delete(cachePath, true);
131+
} catch (IOException e) {
132+
LOG.warn("Failed to clean up cache directory: {}", cachePath, e);
133+
}
134+
scheduler.shutdownNow();
135+
scheduler.awaitTermination(1, TimeUnit.SECONDS);
136+
}
137+
138+
private void executeQueries(IDriver driver) throws Exception {
139+
for (int i = 0; i < 2; i++) {
140+
driver.run(Q_WINDOW);
141+
driver.run(Q_JOIN);
142+
driver.run(Q_CTE);
143+
}
144+
}
145+
146+
@Test
147+
public void testSafeCacheWrite() throws Exception {
148+
runCacheScenario(true);
149+
}
150+
151+
@Test
152+
public void testUnsafeCacheWrite() throws Exception {
153+
runCacheScenario(false);
154+
}
155+
156+
private void runCacheScenario(boolean safeCacheWrite) throws Exception {
157+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED, safeCacheWrite);
158+
startCacheMonitor(1);
159+
executeQueries(DriverFactory.newDriver(conf));
160+
stopCacheMonitor();
161+
Assert.assertNotEquals("cache size should have grown", 0, maxCacheSize);
162+
if (safeCacheWrite) {
163+
LOG.info("Maximum cache size in safe mode: {}", maxCacheSize);
164+
Assert.assertFalse("max cache size should stay within limit", maxCacheSize > MAX_ALLOWED_CACHE_SIZE);
165+
} else {
166+
LOG.info("Maximum cache size in non-safe mode: {}", maxCacheSize);
167+
Assert.assertFalse("max cache size should exceed limit when unsafe", maxCacheSize < MAX_ALLOWED_CACHE_SIZE);
168+
}
169+
}
170+
171+
private void startCacheMonitor(long intervalMs) {
172+
Path cachePath = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY));
173+
scheduler.scheduleAtFixedRate(() -> {
174+
try {
175+
long size = cachePath.getFileSystem(conf).getContentSummary(cachePath).getLength();
176+
maxCacheSize = Math.max(maxCacheSize, size);
177+
} catch (IOException e) {
178+
LOG.debug("cache path not readable yet: {}", cachePath, e);
179+
}
180+
}, 0, intervalMs, TimeUnit.MILLISECONDS);
181+
}
182+
183+
private void stopCacheMonitor() throws InterruptedException {
184+
scheduler.shutdown();
185+
scheduler.awaitTermination(2, TimeUnit.SECONDS);
186+
}
187+
}

0 commit comments

Comments
 (0)