-
-
Notifications
You must be signed in to change notification settings - Fork 125
Expand file tree
/
Copy pathapp.py
More file actions
119 lines (102 loc) · 4.35 KB
/
app.py
File metadata and controls
119 lines (102 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import json
import base64
import requests
from walkoff_app_sdk.app_base import AppBase
class isMalicious(AppBase):
__version__ = "1.0.0"
app_name = "ismalicious"
def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def _get_auth_header(self, api_key, api_secret):
"""Generate authentication header."""
credentials = f"{api_key}:{api_secret}"
encoded = base64.b64encode(credentials.encode()).decode()
return {"X-API-KEY": encoded, "Accept": "application/json"}
def _get_base_url(self, api_url=None):
"""Get base URL with fallback to default."""
if api_url and api_url.strip():
return api_url.rstrip("/")
return "https://ismalicious.com"
def check_ip(self, api_key, api_secret, ip, enrichment="standard", api_url=None):
"""Check if an IP address is malicious."""
base_url = self._get_base_url(api_url)
headers = self._get_auth_header(api_key, api_secret)
try:
response = requests.get(
f"{base_url}/api/check",
params={"query": ip, "enrichment": enrichment or "standard"},
headers=headers,
timeout=30,
)
response.raise_for_status()
result = response.json()
return json.dumps({"success": True, **result})
except requests.exceptions.RequestException as e:
return json.dumps({"success": False, "error": str(e)})
def check_domain(
self, api_key, api_secret, domain, enrichment="standard", api_url=None
):
"""Check if a domain is malicious."""
base_url = self._get_base_url(api_url)
headers = self._get_auth_header(api_key, api_secret)
try:
response = requests.get(
f"{base_url}/api/check",
params={"query": domain, "enrichment": enrichment or "standard"},
headers=headers,
timeout=30,
)
response.raise_for_status()
result = response.json()
return json.dumps({"success": True, **result})
except requests.exceptions.RequestException as e:
return json.dumps({"success": False, "error": str(e)})
def get_reputation(self, api_key, api_secret, query, api_url=None):
"""Get reputation data for an IP or domain."""
base_url = self._get_base_url(api_url)
headers = self._get_auth_header(api_key, api_secret)
try:
response = requests.get(
f"{base_url}/api/check/reputation",
params={"query": query},
headers=headers,
timeout=30,
)
response.raise_for_status()
result = response.json()
return json.dumps({"success": True, **result})
except requests.exceptions.RequestException as e:
return json.dumps({"success": False, "error": str(e)})
def get_location(self, api_key, api_secret, ip, api_url=None):
"""Get geolocation data for an IP address."""
base_url = self._get_base_url(api_url)
headers = self._get_auth_header(api_key, api_secret)
try:
response = requests.get(
f"{base_url}/api/check/location",
params={"query": ip},
headers=headers,
timeout=30,
)
response.raise_for_status()
result = response.json()
return json.dumps({"success": True, **result})
except requests.exceptions.RequestException as e:
return json.dumps({"success": False, "error": str(e)})
def get_blocklist_stats(self, api_key, api_secret, api_url=None):
"""Get statistics about available blocklists."""
base_url = self._get_base_url(api_url)
headers = self._get_auth_header(api_key, api_secret)
try:
response = requests.get(
f"{base_url}/api/blocklist/stats",
headers=headers,
timeout=30,
)
response.raise_for_status()
result = response.json()
return json.dumps({"success": True, **result})
except requests.exceptions.RequestException as e:
return json.dumps({"success": False, "error": str(e)})
if __name__ == "__main__":
isMalicious.run()