Skip to content

Commit 0cd2720

Browse files
committed
HIVE-29465:Prevent excessive query results cache usage at runtime
1 parent f693852 commit 0cd2720

4 files changed

Lines changed: 283 additions & 7 deletions

File tree

common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5670,7 +5670,9 @@ public static enum ConfVars {
56705670
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
56715671
"If the query results cache is enabled. This will keep results of previously executed queries " +
56725672
"to be reused if the same query is executed again."),
5673-
5673+
HIVE_QUERY_RESULTS_SAFE_CACHE_ENABLED("hive.query.results.safe.cache.enabled", true,
5674+
"If the query results safe cache is enabled. This will safely write to cache directory by first evaluating " +
5675+
"the cache entry is not overspilling the the cache directory before writing it to cache directory "),
56745676
HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED("hive.query.results.cache.nontransactional.tables.enabled", false,
56755677
"If the query results cache is enabled for queries involving non-transactional tables." +
56765678
"Users who enable this setting should be willing to tolerate some amount of stale results in the cache."),

ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.fs.ContentSummary;
5151
import org.apache.hadoop.fs.FileStatus;
5252
import org.apache.hadoop.fs.FileSystem;
53+
import org.apache.hadoop.fs.FileUtil;
5354
import org.apache.hadoop.fs.Path;
5455
import org.apache.hadoop.fs.permission.FsPermission;
5556
import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -358,6 +359,7 @@ public Stream<String> getTableNames() {
358359
private long maxEntryLifetime;
359360
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
360361
private ScheduledFuture<?> invalidationPollFuture;
362+
private String safeDir;
361363

362364
private QueryResultsCache(HiveConf configuration) throws IOException {
363365
this.conf = configuration;
@@ -415,6 +417,14 @@ public Path getCacheDirPath() {
415417
return cacheDirPath;
416418
}
417419

420+
public void setSafeDir(String dirName) {
421+
safeDir = dirName;
422+
}
423+
424+
public String getSafeDir() {
425+
return safeDir;
426+
}
427+
418428
/**
419429
* Check if the cache contains an entry for the requested LookupInfo.
420430
* @param request
@@ -549,6 +559,23 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
549559
return false;
550560
}
551561

562+
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_ENABLED)) {
563+
Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString());
564+
FileSystem cacheFs = resultDir.getFileSystem(conf);
565+
cacheFs.mkdirs(resultDir);
566+
567+
Set<FileStatus> cacheFilesToFetch = new HashSet<>();
568+
569+
for (FileStatus fs : fetchWork.getFilesToFetch()) {
570+
Path destFile = new Path(resultDir,
571+
new Path(fs.getPath().toString().substring(safeDir.length() + 1)));
572+
FileUtil.copy(fs.getPath().getFileSystem(conf), fs.getPath(), cacheFs, destFile, false, conf);
573+
cacheFilesToFetch.add(cacheFs.getFileStatus(destFile));
574+
}
575+
fetchWork.setFilesToFetch(cacheFilesToFetch);
576+
fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDir.length() + 1)));
577+
}
578+
552579
// Synchronize on the cache entry so that no one else can invalidate this entry
553580
// while we are in the process of setting it to valid.
554581
synchronized (cacheEntry) {

ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7500,19 +7500,24 @@ protected Table getTargetTable(QB qb, String dest) throws SemanticException {
75007500
}
75017501

75027502
private Path getDestinationFilePath(QB qb, final String destinationFile, boolean isMmTable) {
7503+
Path defaultPath = new Path(destinationFile);
75037504
if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache(qb)) {
75047505
assert (!isMmTable);
75057506
QueryResultsCache instance = QueryResultsCache.getInstance();
75067507
// QueryResultsCache should have been initialized by now
75077508
if (instance != null) {
7508-
Path resultCacheTopDir = instance.getCacheDirPath();
7509-
String dirName = UUID.randomUUID().toString();
7510-
Path resultDir = new Path(resultCacheTopDir, dirName);
7511-
this.ctx.setFsResultCacheDirs(resultDir);
7512-
return resultDir;
7509+
if (!conf.getBoolVar(ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_ENABLED)) {
7510+
Path resultCacheTopDir = instance.getCacheDirPath();
7511+
String dirName = UUID.randomUUID().toString();
7512+
Path resultDir = new Path(resultCacheTopDir, dirName);
7513+
this.ctx.setFsResultCacheDirs(resultDir);
7514+
return resultDir;
7515+
} else {
7516+
instance.setSafeDir(defaultPath.toString());
7517+
}
75137518
}
75147519
}
7515-
return new Path(destinationFile);
7520+
return defaultPath;
75167521
}
75177522

75187523
@SuppressWarnings("nls")
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.ql;
20+
21+
import org.apache.hadoop.hive.conf.HiveConf;
22+
import org.apache.hadoop.hive.ql.session.SessionState;
23+
import org.apache.hive.testutils.HiveTestEnvSetup;
24+
import org.junit.*;
25+
import org.junit.rules.TestRule;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.File;
30+
import java.util.concurrent.*;
31+
32+
public class TestCachedResults {
33+
34+
private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class);
35+
36+
@ClassRule
37+
public static HiveTestEnvSetup env_setup = new HiveTestEnvSetup();
38+
39+
@Rule
40+
public TestRule methodRule = env_setup.getMethodRule();
41+
42+
private static HiveConf conf;
43+
private static String cacheDir;
44+
45+
private ScheduledExecutorService scheduler;
46+
private long maxCacheSize = 0;
47+
private static long maxAllowedCacheSize = 1000000;
48+
49+
50+
@BeforeClass
51+
public static void setUp() throws Exception {
52+
conf = env_setup.getTestCtx().hiveConf;
53+
54+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true);
55+
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache");
56+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED, true);
57+
conf.setLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE, maxAllowedCacheSize);
58+
LOG.info("max allowed cache size : {}", conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE));
59+
60+
cacheDir = conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY);
61+
62+
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
63+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
64+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
65+
66+
SessionState.start(conf);
67+
createAndPopulateTables();
68+
}
69+
70+
public static void createAndPopulateTables() throws Exception {
71+
IDriver driver = DriverFactory.newDriver(conf);
72+
runQuery(driver, "DROP TABLE IF EXISTS tab");
73+
runQuery(driver, "CREATE TABLE tab (id INT)");
74+
runQuery(driver,
75+
"INSERT INTO TABLE tab " +
76+
"SELECT pos + 1 AS id FROM (" +
77+
" SELECT posexplode(split(space(999), ' ')) AS (pos, val)" +
78+
") t"
79+
);
80+
}
81+
82+
@AfterClass
83+
public static void afterClass() throws Exception {
84+
IDriver driver = DriverFactory.newDriver(conf);
85+
driver.run("DROP TABLE IF EXISTS tab");
86+
}
87+
88+
@Before
89+
public void beforeEach() {
90+
scheduler = Executors.newSingleThreadScheduledExecutor();
91+
maxCacheSize = 0;
92+
}
93+
94+
@After
95+
public void afterEach() throws InterruptedException {
96+
scheduler.shutdownNow();
97+
scheduler.awaitTermination(1, TimeUnit.SECONDS);
98+
}
99+
100+
private void executeQueries(IDriver driver) throws Exception {
101+
102+
runQuery(driver,
103+
"SELECT t1.id, " +
104+
" SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " +
105+
" AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " +
106+
" COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " +
107+
"FROM tab t1 " +
108+
"JOIN tab t2 ON t1.id % 10 = t2.id % 10 " +
109+
"WHERE t1.id <= 300"
110+
);
111+
// running same query to check for cache hit while debugging
112+
runQuery(driver,
113+
"SELECT t1.id, " +
114+
" SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " +
115+
" AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " +
116+
" COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " +
117+
"FROM tab t1 " +
118+
"JOIN tab t2 ON t1.id % 10 = t2.id % 10 " +
119+
"WHERE t1.id <= 300"
120+
);
121+
runQuery(driver,
122+
"SELECT base.id, base.bucket, agg.bucket_avg " +
123+
"FROM ( " +
124+
" SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " +
125+
") base " +
126+
"JOIN ( " +
127+
" SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " +
128+
" FROM tab " +
129+
" GROUP BY id % 10 " +
130+
") agg ON base.bucket = agg.bucket " +
131+
"ORDER BY base.id"
132+
);
133+
// running same query to check for cache hit while debugging
134+
runQuery(driver,
135+
"SELECT base.id, base.bucket, agg.bucket_avg " +
136+
"FROM ( " +
137+
" SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " +
138+
") base " +
139+
"JOIN ( " +
140+
" SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " +
141+
" FROM tab " +
142+
" GROUP BY id % 10 " +
143+
") agg ON base.bucket = agg.bucket " +
144+
"ORDER BY base.id"
145+
);
146+
runQuery(driver,
147+
"WITH base AS ( " +
148+
" SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " +
149+
" FROM tab " +
150+
"), " +
151+
"joined AS ( " +
152+
" SELECT a.id AS a_id, b.id AS b_id, " +
153+
" a.mod5, a.mod10, " +
154+
" (a.id * b.id) AS product " +
155+
" FROM base a " +
156+
" JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " +
157+
" WHERE a.id <= 200 AND b.id <= 200 " +
158+
") " +
159+
"SELECT mod5, mod10, " +
160+
" COUNT(*) AS cnt, " +
161+
" SUM(product) AS total_product, " +
162+
" MAX(product) AS max_product, " +
163+
" MIN(a_id) AS min_a " +
164+
"FROM joined " +
165+
"GROUP BY mod5, mod10 " +
166+
"ORDER BY mod5, mod10"
167+
);
168+
// running same query to check for cache hit while debugging
169+
runQuery(driver,
170+
"WITH base AS ( " +
171+
" SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " +
172+
" FROM tab " +
173+
"), " +
174+
"joined AS ( " +
175+
" SELECT a.id AS a_id, b.id AS b_id, " +
176+
" a.mod5, a.mod10, " +
177+
" (a.id * b.id) AS product " +
178+
" FROM base a " +
179+
" JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " +
180+
" WHERE a.id <= 200 AND b.id <= 200 " +
181+
") " +
182+
"SELECT mod5, mod10, " +
183+
" COUNT(*) AS cnt, " +
184+
" SUM(product) AS total_product, " +
185+
" MAX(product) AS max_product, " +
186+
" MIN(a_id) AS min_a " +
187+
"FROM joined " +
188+
"GROUP BY mod5, mod10 " +
189+
"ORDER BY mod5, mod10"
190+
);
191+
}
192+
193+
@Test
194+
public void testCacheSizeRuntime() throws Exception {
195+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_ENABLED, false);
196+
startCacheMonitor(1);
197+
IDriver driver = DriverFactory.newDriver(conf);
198+
executeQueries(driver);
199+
stopCacheMonitor();
200+
LOG.info("Maximum cache size in non safe mode went upto : {}", maxCacheSize);
201+
Assert.assertFalse("max cache size recorded should be greater than max allowed cache size", maxCacheSize < maxAllowedCacheSize);
202+
}
203+
204+
@Test
205+
public void testSafeCacheSizeRuntime() throws Exception {
206+
startCacheMonitor(1);
207+
IDriver driver = DriverFactory.newDriver(conf);
208+
executeQueries(driver);
209+
stopCacheMonitor();
210+
LOG.info("Maximum cache size in safe mode went upto : {}", maxCacheSize);
211+
Assert.assertFalse("max cache size recorded should be smaller than max allowed cache size", maxCacheSize > maxAllowedCacheSize);
212+
}
213+
214+
private void startCacheMonitor(long intervalMs) {
215+
scheduler.scheduleAtFixedRate(() -> {
216+
long size = getFolderSize(new File(cacheDir));
217+
long ts = System.currentTimeMillis();
218+
maxCacheSize = Math.max(maxCacheSize, size);
219+
}, 0, intervalMs, TimeUnit.MILLISECONDS);
220+
}
221+
222+
private void stopCacheMonitor() throws InterruptedException {
223+
scheduler.shutdown();
224+
scheduler.awaitTermination(2, TimeUnit.SECONDS);
225+
}
226+
227+
private static void runQuery(IDriver driver, String sql) throws Exception {
228+
driver.run(sql);
229+
}
230+
231+
private static long getFolderSize(File folder) {
232+
if (folder == null || !folder.exists()) return 0L;
233+
long size = 0;
234+
File[] files = folder.listFiles();
235+
if (files != null) {
236+
for (File f : files) {
237+
size += f.isDirectory() ? getFolderSize(f) : f.length();
238+
}
239+
}
240+
return size;
241+
}
242+
}

0 commit comments

Comments
 (0)