|
19 | 19 | import requests |
20 | 20 |
|
21 | 21 | from vulnerabilities.models import AdvisoryAlias |
| 22 | +from vulnerabilities.models import AdvisoryV2 |
22 | 23 | from vulnerabilities.models import DetectionRule |
23 | 24 | from vulnerabilities.models import DetectionRuleTypes |
24 | 25 | from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 |
25 | | -from vulnerabilities.utils import find_all_cve |
| 26 | +from vulnerabilities.utils import find_all_cve_rule |
26 | 27 |
|
27 | 28 |
|
28 | 29 | def extract_cvd(cvd_path, output_dir): |
@@ -93,13 +94,6 @@ def parse_hdb_file(hdb_path: Path) -> List[dict]: |
93 | 94 | return signatures |
94 | 95 |
|
95 | 96 |
|
96 | | -def extract_cve_id(name: str): |
97 | | - """Normalize underscores and extract the first CVE ID from a string, or None.""" |
98 | | - normalized = name.replace("_", "-") |
99 | | - cves = [cve.upper() for cve in find_all_cve(normalized)] |
100 | | - return cves[0] if cves else None |
101 | | - |
102 | | - |
103 | 97 | class ClamVRulesImproverPipeline(VulnerableCodeBaseImporterPipelineV2): |
104 | 98 | """ |
105 | 99 | Pipeline that downloads ClamAV database (main.cvd), extracts signatures, |
@@ -159,36 +153,27 @@ def collect_and_store_advisories(self): |
159 | 153 | self.extract_cvd_dir / "main.ndb" |
160 | 154 | ): |
161 | 155 | name = rule_entry.get("name", "") |
162 | | - cve_id = extract_cve_id(name) |
163 | | - found_advisories = set() |
164 | | - |
165 | | - if cve_id: |
166 | | - try: |
167 | | - if alias := AdvisoryAlias.objects.get(alias=cve_id): |
168 | | - for adv in alias.advisories.all(): |
169 | | - found_advisories.add(adv) |
170 | | - except AdvisoryAlias.DoesNotExist: |
171 | | - self.log(f"Advisory {cve_id} not found.") |
172 | | - |
173 | | - for adv in found_advisories: |
174 | | - DetectionRule.objects.update_or_create( |
175 | | - rule_text=str(rule_entry), |
176 | | - rule_type=DetectionRuleTypes.CLAMAV, |
177 | | - advisory=adv, |
178 | | - defaults={ |
179 | | - "source_url": self.MAIN_DATABASE_URL, |
180 | | - }, |
181 | | - ) |
182 | | - |
183 | | - if not found_advisories: |
184 | | - DetectionRule.objects.update_or_create( |
185 | | - rule_text=str(rule_entry), |
186 | | - rule_type=DetectionRuleTypes.CLAMAV, |
187 | | - advisory=None, |
188 | | - defaults={ |
189 | | - "source_url": self.MAIN_DATABASE_URL, |
190 | | - }, |
191 | | - ) |
| 156 | + cve_ids = find_all_cve_rule(name) |
| 157 | + |
| 158 | + advisories = set() |
| 159 | + for cve_id in cve_ids: |
| 160 | + alias = AdvisoryAlias.objects.filter(alias=cve_id).first() |
| 161 | + if alias: |
| 162 | + for adv in alias.advisories.all(): |
| 163 | + advisories.add(adv) |
| 164 | + else: |
| 165 | + advs = AdvisoryV2.objects.filter(advisory_id=cve_id) |
| 166 | + for adv in advs: |
| 167 | + advisories.add(adv) |
| 168 | + |
| 169 | + detection_rule, _ = DetectionRule.objects.get_or_create( |
| 170 | + rule_text=str(rule_entry), |
| 171 | + rule_type=DetectionRuleTypes.CLAMAV, |
| 172 | + source_url=self.MAIN_DATABASE_URL, |
| 173 | + ) |
| 174 | + |
| 175 | + for adv in advisories: |
| 176 | + detection_rule.related_advisories.add(adv) |
192 | 177 |
|
193 | 178 | def clean_downloads(self): |
194 | 179 | """Clean up downloaded files.""" |
|
0 commit comments