Skip to content

Commit 76ff210

Browse files
feat: add api keys support for ingestion/query authentication (#1620)
* feat: add api keys support for ingestion authentication use api keys as an alternative auth mechanism for ingestion use header `X-API-KEY` in place of basic auth middleware validates the header on ingest action key store in object store at .settings/apikeys/<key-id>.json * uuid for api key * clippy and fmt fixes * deepsource and coderabbit review comments * key type to accomodate query via api key * fix comments * middleware session is registered only after validation succeeds
1 parent 5d57a71 commit 76ff210

9 files changed

Lines changed: 585 additions & 38 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ semver = "1.0"
158158
static-files = "0.2"
159159
thiserror = "2.0"
160160
ulid = { version = "1.0", features = ["serde"] }
161+
uuid = { version = "1", features = ["v4"] }
161162
xxhash-rust = { version = "0.8", features = ["xxh3"] }
162163
futures-core = "0.3.31"
163164
tempfile = "3.20.0"

src/apikeys.rs

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::collections::HashMap;
20+
21+
use chrono::{DateTime, Utc};
22+
use once_cell::sync::Lazy;
23+
use serde::{Deserialize, Serialize};
24+
use tokio::sync::RwLock;
25+
use ulid::Ulid;
26+
27+
use crate::{
28+
metastore::metastore_traits::MetastoreObject,
29+
parseable::{DEFAULT_TENANT, PARSEABLE},
30+
storage::object_storage::apikey_json_path,
31+
};
32+
33+
pub static API_KEYS: Lazy<ApiKeyStore> = Lazy::new(|| ApiKeyStore {
34+
keys: RwLock::new(HashMap::new()),
35+
});
36+
37+
#[derive(Debug)]
38+
pub struct ApiKeyStore {
39+
pub keys: RwLock<HashMap<String, HashMap<Ulid, ApiKey>>>,
40+
}
41+
42+
/// Type of API key, determining how it can be used.
43+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44+
#[serde(rename_all = "lowercase")]
45+
pub enum KeyType {
46+
/// Used as a substitute for basic auth on ingestion endpoints
47+
Ingestion,
48+
/// Used as a substitute for basic auth on query endpoints (global query access)
49+
Query,
50+
}
51+
52+
#[derive(Debug, Clone, Serialize, Deserialize)]
53+
#[serde(rename_all = "camelCase")]
54+
pub struct ApiKey {
55+
pub key_id: Ulid,
56+
pub api_key: String,
57+
pub key_name: String,
58+
#[serde(default = "default_key_type")]
59+
pub key_type: KeyType,
60+
pub created_by: String,
61+
pub created_at: DateTime<Utc>,
62+
pub modified_at: DateTime<Utc>,
63+
#[serde(default)]
64+
pub tenant: Option<String>,
65+
}
66+
67+
fn default_key_type() -> KeyType {
68+
KeyType::Ingestion
69+
}
70+
71+
/// Request body for creating a new API key
72+
#[derive(Debug, Deserialize)]
73+
#[serde(rename_all = "camelCase")]
74+
pub struct CreateApiKeyRequest {
75+
pub key_name: String,
76+
#[serde(default = "default_key_type")]
77+
pub key_type: KeyType,
78+
}
79+
80+
/// Response for list keys (api_key masked to last 4 chars)
81+
#[derive(Debug, Serialize)]
82+
#[serde(rename_all = "camelCase")]
83+
pub struct ApiKeyListEntry {
84+
pub key_id: Ulid,
85+
pub api_key: String,
86+
pub key_name: String,
87+
pub key_type: KeyType,
88+
pub created_by: String,
89+
pub created_at: DateTime<Utc>,
90+
pub modified_at: DateTime<Utc>,
91+
}
92+
93+
impl ApiKey {
94+
pub fn new(
95+
key_name: String,
96+
key_type: KeyType,
97+
created_by: String,
98+
tenant: Option<String>,
99+
) -> Self {
100+
let now = Utc::now();
101+
Self {
102+
key_id: Ulid::new(),
103+
api_key: uuid::Uuid::new_v4().to_string(),
104+
key_name,
105+
key_type,
106+
created_by,
107+
created_at: now,
108+
modified_at: now,
109+
tenant,
110+
}
111+
}
112+
113+
pub fn to_list_entry(&self) -> ApiKeyListEntry {
114+
let masked = if self.api_key.len() >= 4 {
115+
let last4 = &self.api_key[self.api_key.len() - 4..];
116+
format!("****{last4}")
117+
} else {
118+
"****".to_string()
119+
};
120+
ApiKeyListEntry {
121+
key_id: self.key_id,
122+
api_key: masked,
123+
key_name: self.key_name.clone(),
124+
key_type: self.key_type,
125+
created_by: self.created_by.clone(),
126+
created_at: self.created_at,
127+
modified_at: self.modified_at,
128+
}
129+
}
130+
}
131+
132+
impl MetastoreObject for ApiKey {
133+
fn get_object_path(&self) -> String {
134+
apikey_json_path(&self.key_id, &self.tenant).to_string()
135+
}
136+
137+
fn get_object_id(&self) -> String {
138+
self.key_id.to_string()
139+
}
140+
}
141+
142+
impl ApiKeyStore {
143+
/// Load API keys from object store into memory
144+
pub async fn load(&self) -> anyhow::Result<()> {
145+
let api_keys = PARSEABLE.metastore.get_api_keys().await?;
146+
let mut map = self.keys.write().await;
147+
for (tenant_id, keys) in api_keys {
148+
let inner = keys
149+
.into_iter()
150+
.map(|mut k| {
151+
k.tenant = if tenant_id == DEFAULT_TENANT {
152+
None
153+
} else {
154+
Some(tenant_id.clone())
155+
};
156+
(k.key_id, k)
157+
})
158+
.collect();
159+
map.insert(tenant_id, inner);
160+
}
161+
Ok(())
162+
}
163+
164+
/// Create a new API key
165+
pub async fn create(&self, api_key: ApiKey) -> Result<(), ApiKeyError> {
166+
let tenant = api_key.tenant.as_deref().unwrap_or(DEFAULT_TENANT);
167+
let key_id = api_key.key_id;
168+
169+
// Check duplicate name and reserve the slot under the write lock,
170+
// then drop the lock before the async metastore call so we don't
171+
// hold a global lock across an await.
172+
{
173+
let mut map = self.keys.write().await;
174+
if let Some(tenant_keys) = map.get(tenant)
175+
&& tenant_keys.values().any(|k| k.key_name == api_key.key_name)
176+
{
177+
return Err(ApiKeyError::DuplicateKeyName(api_key.key_name));
178+
}
179+
map.entry(tenant.to_owned())
180+
.or_default()
181+
.insert(key_id, api_key.clone());
182+
}
183+
184+
// Persist to storage without holding the lock. On failure, remove
185+
// the reservation so stale entries don't linger in memory.
186+
if let Err(e) = PARSEABLE
187+
.metastore
188+
.put_api_key(&api_key, &api_key.tenant)
189+
.await
190+
{
191+
let mut map = self.keys.write().await;
192+
if let Some(tenant_keys) = map.get_mut(tenant) {
193+
tenant_keys.remove(&key_id);
194+
}
195+
return Err(e.into());
196+
}
197+
198+
Ok(())
199+
}
200+
201+
/// Delete an API key by key_id
202+
pub async fn delete(
203+
&self,
204+
key_id: &Ulid,
205+
tenant_id: &Option<String>,
206+
) -> Result<ApiKey, ApiKeyError> {
207+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
208+
209+
// Read the key first without removing
210+
let api_key = {
211+
let map = self.keys.read().await;
212+
let tenant_keys = map
213+
.get(tenant)
214+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?;
215+
tenant_keys
216+
.get(key_id)
217+
.cloned()
218+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?
219+
};
220+
221+
// Delete from storage first
222+
PARSEABLE
223+
.metastore
224+
.delete_api_key(&api_key, tenant_id)
225+
.await?;
226+
227+
// Remove from memory only after successful storage deletion
228+
{
229+
let mut map = self.keys.write().await;
230+
if let Some(tenant_keys) = map.get_mut(tenant) {
231+
tenant_keys.remove(key_id);
232+
}
233+
}
234+
235+
Ok(api_key)
236+
}
237+
238+
/// List all API keys for a tenant (returns masked entries)
239+
pub async fn list(
240+
&self,
241+
tenant_id: &Option<String>,
242+
) -> Result<Vec<ApiKeyListEntry>, ApiKeyError> {
243+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
244+
let map = self.keys.read().await;
245+
let entries = if let Some(tenant_keys) = map.get(tenant) {
246+
tenant_keys.values().map(|k| k.to_list_entry()).collect()
247+
} else {
248+
vec![]
249+
};
250+
Ok(entries)
251+
}
252+
253+
/// Get a specific API key by key_id (returns full key)
254+
pub async fn get(
255+
&self,
256+
key_id: &Ulid,
257+
tenant_id: &Option<String>,
258+
) -> Result<ApiKey, ApiKeyError> {
259+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
260+
let map = self.keys.read().await;
261+
let tenant_keys = map
262+
.get(tenant)
263+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?;
264+
tenant_keys
265+
.get(key_id)
266+
.cloned()
267+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))
268+
}
269+
270+
/// Validate an API key against a required key type. Returns true if the
271+
/// key is valid AND its type matches the required type.
272+
/// For multi-tenant: checks the key belongs to the specified tenant.
273+
/// For single-tenant: checks the key exists globally.
274+
pub async fn validate_key(
275+
&self,
276+
api_key_value: &str,
277+
tenant_id: &Option<String>,
278+
required_type: KeyType,
279+
) -> bool {
280+
let map = self.keys.read().await;
281+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
282+
if let Some(tenant_keys) = map.get(tenant) {
283+
return tenant_keys
284+
.values()
285+
.any(|k| k.api_key == api_key_value && k.key_type == required_type);
286+
}
287+
false
288+
}
289+
290+
/// Insert an API key directly into memory (used for sync from prism)
291+
pub async fn sync_put(&self, api_key: ApiKey) {
292+
let tenant = api_key
293+
.tenant
294+
.as_deref()
295+
.unwrap_or(DEFAULT_TENANT)
296+
.to_owned();
297+
let mut map = self.keys.write().await;
298+
map.entry(tenant)
299+
.or_default()
300+
.insert(api_key.key_id, api_key);
301+
}
302+
303+
/// Remove an API key from memory (used for sync from prism)
304+
pub async fn sync_delete(&self, key_id: &Ulid, tenant_id: &Option<String>) {
305+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
306+
let mut map = self.keys.write().await;
307+
if let Some(tenant_keys) = map.get_mut(tenant) {
308+
tenant_keys.remove(key_id);
309+
}
310+
}
311+
}
312+
313+
#[derive(Debug, thiserror::Error)]
314+
pub enum ApiKeyError {
315+
#[error("API key not found: {0}")]
316+
KeyNotFound(String),
317+
318+
#[error("Duplicate key name: {0}")]
319+
DuplicateKeyName(String),
320+
321+
#[error("Unauthorized: {0}")]
322+
Unauthorized(String),
323+
324+
#[error("{0}")]
325+
MetastoreError(#[from] crate::metastore::MetastoreError),
326+
327+
#[error("{0}")]
328+
AnyhowError(#[from] anyhow::Error),
329+
}
330+
331+
impl actix_web::ResponseError for ApiKeyError {
332+
fn status_code(&self) -> actix_web::http::StatusCode {
333+
match self {
334+
ApiKeyError::KeyNotFound(_) => actix_web::http::StatusCode::NOT_FOUND,
335+
ApiKeyError::DuplicateKeyName(_) => actix_web::http::StatusCode::CONFLICT,
336+
ApiKeyError::Unauthorized(_) => actix_web::http::StatusCode::FORBIDDEN,
337+
ApiKeyError::MetastoreError(_) | ApiKeyError::AnyhowError(_) => {
338+
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR
339+
}
340+
}
341+
}
342+
}

0 commit comments

Comments
 (0)