Skip to content

Commit 8a94cba

Browse files
committed
HIVE-29465:Prevent excessive query results cache usage at runtime
1 parent f693852 commit 8a94cba

4 files changed

Lines changed: 301 additions & 7 deletions

File tree

common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5670,7 +5670,9 @@ public static enum ConfVars {
56705670
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
56715671
"If the query results cache is enabled. This will keep results of previously executed queries " +
56725672
"to be reused if the same query is executed again."),
5673-
5673+
HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED("hive.query.results.safe.cache.write.enabled", false,
5674+
"If the query results safe cache is enabled. This will safely write to cache directory by first evaluating " +
5675+
"the cache entry is not overspilling the the cache directory before writing it to cache directory "),
56745676
HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED("hive.query.results.cache.nontransactional.tables.enabled", false,
56755677
"If the query results cache is enabled for queries involving non-transactional tables." +
56765678
"Users who enable this setting should be willing to tolerate some amount of stale results in the cache."),

ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.fs.ContentSummary;
5151
import org.apache.hadoop.fs.FileStatus;
5252
import org.apache.hadoop.fs.FileSystem;
53+
import org.apache.hadoop.fs.FileUtil;
5354
import org.apache.hadoop.fs.Path;
5455
import org.apache.hadoop.fs.permission.FsPermission;
5556
import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -353,11 +354,13 @@ public Stream<String> getTableNames() {
353354
private Path cacheDirPath;
354355
private Path zeroRowsPath;
355356
private long cacheSize = 0;
357+
private boolean isSafeCacheWriteEnabled;
356358
private long maxCacheSize;
357359
private long maxEntrySize;
358360
private long maxEntryLifetime;
359361
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
360362
private ScheduledFuture<?> invalidationPollFuture;
363+
private String safeDir;
361364

362365
private QueryResultsCache(HiveConf configuration) throws IOException {
363366
this.conf = configuration;
@@ -378,6 +381,8 @@ private QueryResultsCache(HiveConf configuration) throws IOException {
378381
// Results cache directory should be cleaned up at process termination.
379382
fs.deleteOnExit(cacheDirPath);
380383

384+
isSafeCacheWriteEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED);
385+
381386
maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE);
382387
maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE);
383388
maxEntryLifetime = conf.getTimeVar(
@@ -415,6 +420,14 @@ public Path getCacheDirPath() {
415420
return cacheDirPath;
416421
}
417422

423+
public void setSafeDir(String dirName) {
424+
safeDir = dirName;
425+
}
426+
427+
public String getSafeDir() {
428+
return safeDir;
429+
}
430+
418431
/**
419432
* Check if the cache contains an entry for the requested LookupInfo.
420433
* @param request
@@ -549,6 +562,23 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
549562
return false;
550563
}
551564

565+
if (isSafeCacheWriteEnabled) {
566+
Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString());
567+
FileSystem cacheFs = resultDir.getFileSystem(conf);
568+
cacheFs.mkdirs(resultDir);
569+
570+
Set<FileStatus> cacheFilesToFetch = new HashSet<>();
571+
572+
for (FileStatus fs : fetchWork.getFilesToFetch()) {
573+
Path destFile = new Path(resultDir,
574+
new Path(fs.getPath().toString().substring(safeDir.length() + 1)));
575+
FileUtil.copy(fs.getPath().getFileSystem(conf), fs.getPath(), cacheFs, destFile, false, conf);
576+
cacheFilesToFetch.add(cacheFs.getFileStatus(destFile));
577+
}
578+
fetchWork.setFilesToFetch(cacheFilesToFetch);
579+
fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDir.length() + 1)));
580+
}
581+
552582
// Synchronize on the cache entry so that no one else can invalidate this entry
553583
// while we are in the process of setting it to valid.
554584
synchronized (cacheEntry) {

ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7500,19 +7500,24 @@ protected Table getTargetTable(QB qb, String dest) throws SemanticException {
75007500
}
75017501

75027502
private Path getDestinationFilePath(QB qb, final String destinationFile, boolean isMmTable) {
7503+
Path defaultPath = new Path(destinationFile);
75037504
if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache(qb)) {
75047505
assert (!isMmTable);
75057506
QueryResultsCache instance = QueryResultsCache.getInstance();
75067507
// QueryResultsCache should have been initialized by now
75077508
if (instance != null) {
7508-
Path resultCacheTopDir = instance.getCacheDirPath();
7509-
String dirName = UUID.randomUUID().toString();
7510-
Path resultDir = new Path(resultCacheTopDir, dirName);
7511-
this.ctx.setFsResultCacheDirs(resultDir);
7512-
return resultDir;
7509+
if (!conf.getBoolVar(ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) {
7510+
Path resultCacheTopDir = instance.getCacheDirPath();
7511+
String dirName = UUID.randomUUID().toString();
7512+
Path resultDir = new Path(resultCacheTopDir, dirName);
7513+
this.ctx.setFsResultCacheDirs(resultDir);
7514+
return resultDir;
7515+
} else {
7516+
instance.setSafeDir(defaultPath.toString());
7517+
}
75137518
}
75147519
}
7515-
return new Path(destinationFile);
7520+
return defaultPath;
75167521
}
75177522

75187523
@SuppressWarnings("nls")
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.ql;
20+
21+
import java.io.File;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.apache.hadoop.hive.conf.HiveConf;
27+
import org.apache.hadoop.hive.ql.session.SessionState;
28+
import org.apache.hive.testutils.HiveTestEnvSetup;
29+
30+
import org.junit.After;
31+
import org.junit.Assert;
32+
import org.junit.AfterClass;
33+
import org.junit.Before;
34+
import org.junit.BeforeClass;
35+
import org.junit.ClassRule;
36+
import org.junit.Rule;
37+
import org.junit.Test;
38+
39+
import org.junit.rules.TestRule;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
44+
public class TestCachedResults {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class);
47+
48+
@ClassRule
49+
public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup();
50+
51+
@Rule
52+
public TestRule methodRule = envSetup.getMethodRule();
53+
54+
private static HiveConf conf;
55+
private static String cacheDir;
56+
57+
private ScheduledExecutorService scheduler;
58+
private long maxCacheSize = 0;
59+
private static long maxAllowedCacheSize = 1000000;
60+
61+
62+
@BeforeClass
63+
public static void setUp() throws Exception {
64+
conf = envSetup.getTestCtx().hiveConf;
65+
66+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true);
67+
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache");
68+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED, true);
69+
HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE, maxAllowedCacheSize);
70+
LOG.info("max allowed cache size : {}", conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE));
71+
72+
cacheDir = conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY);
73+
74+
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
75+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
76+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
77+
78+
SessionState.start(conf);
79+
createAndPopulateTables();
80+
}
81+
82+
public static void createAndPopulateTables() throws Exception {
83+
IDriver driver = DriverFactory.newDriver(conf);
84+
runQuery(driver, "DROP TABLE IF EXISTS tab");
85+
runQuery(driver, "CREATE TABLE tab (id INT)");
86+
runQuery(driver,
87+
"INSERT INTO TABLE tab " +
88+
"SELECT pos + 1 AS id FROM (" +
89+
" SELECT posexplode(split(space(999), ' ')) AS (pos, val)" +
90+
") t"
91+
);
92+
}
93+
94+
@AfterClass
95+
public static void afterClass() throws Exception {
96+
IDriver driver = DriverFactory.newDriver(conf);
97+
driver.run("DROP TABLE IF EXISTS tab");
98+
}
99+
100+
@Before
101+
public void beforeEach() {
102+
scheduler = Executors.newSingleThreadScheduledExecutor();
103+
maxCacheSize = 0;
104+
}
105+
106+
@After
107+
public void afterEach() throws InterruptedException {
108+
scheduler.shutdownNow();
109+
scheduler.awaitTermination(1, TimeUnit.SECONDS);
110+
}
111+
112+
private void executeQueries(IDriver driver) throws Exception {
113+
114+
runQuery(driver,
115+
"SELECT t1.id, " +
116+
" SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " +
117+
" AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " +
118+
" COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " +
119+
"FROM tab t1 " +
120+
"JOIN tab t2 ON t1.id % 10 = t2.id % 10 " +
121+
"WHERE t1.id <= 300"
122+
);
123+
// running same query to check for cache hit while debugging
124+
runQuery(driver,
125+
"SELECT t1.id, " +
126+
" SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " +
127+
" AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " +
128+
" COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " +
129+
"FROM tab t1 " +
130+
"JOIN tab t2 ON t1.id % 10 = t2.id % 10 " +
131+
"WHERE t1.id <= 300"
132+
);
133+
runQuery(driver,
134+
"SELECT base.id, base.bucket, agg.bucket_avg " +
135+
"FROM ( " +
136+
" SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " +
137+
") base " +
138+
"JOIN ( " +
139+
" SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " +
140+
" FROM tab " +
141+
" GROUP BY id % 10 " +
142+
") agg ON base.bucket = agg.bucket " +
143+
"ORDER BY base.id"
144+
);
145+
// running same query to check for cache hit while debugging
146+
runQuery(driver,
147+
"SELECT base.id, base.bucket, agg.bucket_avg " +
148+
"FROM ( " +
149+
" SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " +
150+
") base " +
151+
"JOIN ( " +
152+
" SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " +
153+
" FROM tab " +
154+
" GROUP BY id % 10 " +
155+
") agg ON base.bucket = agg.bucket " +
156+
"ORDER BY base.id"
157+
);
158+
runQuery(driver,
159+
"WITH base AS ( " +
160+
" SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " +
161+
" FROM tab " +
162+
"), " +
163+
"joined AS ( " +
164+
" SELECT a.id AS a_id, b.id AS b_id, " +
165+
" a.mod5, a.mod10, " +
166+
" (a.id * b.id) AS product " +
167+
" FROM base a " +
168+
" JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " +
169+
" WHERE a.id <= 200 AND b.id <= 200 " +
170+
") " +
171+
"SELECT mod5, mod10, " +
172+
" COUNT(*) AS cnt, " +
173+
" SUM(product) AS total_product, " +
174+
" MAX(product) AS max_product, " +
175+
" MIN(a_id) AS min_a " +
176+
"FROM joined " +
177+
"GROUP BY mod5, mod10 " +
178+
"ORDER BY mod5, mod10"
179+
);
180+
// running same query to check for cache hit while debugging
181+
runQuery(driver,
182+
"WITH base AS ( " +
183+
" SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " +
184+
" FROM tab " +
185+
"), " +
186+
"joined AS ( " +
187+
" SELECT a.id AS a_id, b.id AS b_id, " +
188+
" a.mod5, a.mod10, " +
189+
" (a.id * b.id) AS product " +
190+
" FROM base a " +
191+
" JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " +
192+
" WHERE a.id <= 200 AND b.id <= 200 " +
193+
") " +
194+
"SELECT mod5, mod10, " +
195+
" COUNT(*) AS cnt, " +
196+
" SUM(product) AS total_product, " +
197+
" MAX(product) AS max_product, " +
198+
" MIN(a_id) AS min_a " +
199+
"FROM joined " +
200+
"GROUP BY mod5, mod10 " +
201+
"ORDER BY mod5, mod10"
202+
);
203+
}
204+
205+
@Test
206+
public void testUnsafeCacheWrite() throws Exception {
207+
startCacheMonitor(1);
208+
IDriver driver = DriverFactory.newDriver(conf);
209+
executeQueries(driver);
210+
stopCacheMonitor();
211+
Assert.assertNotEquals("cache size should have grown", 0, maxCacheSize);
212+
LOG.info("Maximum cache size in non safe mode went upto : {}", maxCacheSize);
213+
Assert.assertFalse("max cache size recorded should be greater than max allowed cache size",
214+
maxCacheSize < maxAllowedCacheSize);
215+
}
216+
217+
@Test
218+
public void testSafeCacheWrite() throws Exception {
219+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED, true);
220+
startCacheMonitor(1);
221+
IDriver driver = DriverFactory.newDriver(conf);
222+
executeQueries(driver);
223+
stopCacheMonitor();
224+
Assert.assertNotEquals("cache size should have grown", 0, maxCacheSize);
225+
LOG.info("Maximum cache size in safe mode went upto : {}", maxCacheSize);
226+
Assert.assertFalse("max cache size recorded should be smaller than max allowed cache size",
227+
maxCacheSize > maxAllowedCacheSize);
228+
}
229+
230+
private void startCacheMonitor(long intervalMs) {
231+
scheduler.scheduleAtFixedRate(() -> {
232+
long size = getFolderSize(new File(cacheDir));
233+
maxCacheSize = Math.max(maxCacheSize, size);
234+
}, 0, intervalMs, TimeUnit.MILLISECONDS);
235+
}
236+
237+
private void stopCacheMonitor() throws InterruptedException {
238+
scheduler.shutdown();
239+
scheduler.awaitTermination(2, TimeUnit.SECONDS);
240+
}
241+
242+
private static void runQuery(IDriver driver, String sql) throws Exception {
243+
driver.run(sql);
244+
}
245+
246+
private static long getFolderSize(File folder) {
247+
if (folder == null || !folder.exists()) return 0L;
248+
long size = 0;
249+
File[] files = folder.listFiles();
250+
if (files != null) {
251+
for (File f : files) {
252+
size += f.isDirectory() ? getFolderSize(f) : f.length();
253+
}
254+
}
255+
return size;
256+
}
257+
}

0 commit comments

Comments
 (0)