-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpytest_sqlfluff.py
More file actions
181 lines (148 loc) · 6.33 KB
/
pytest_sqlfluff.py
File metadata and controls
181 lines (148 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
Runs SQLFluff tests. https://github.com/sqlfluff/sqlfluff
We realise this as a "proper" pytest plugin since it gives us some advantages
over the naive method of creating Python tests per SQL file that call
sqlfluff.lint():
* With the naive method, we would have to manually make sure that we test
each and every SQL file. For instance, we have pytest.parametrize lists for
all views. Cool. But what about data consistency checks? What about all the
future kinds of SQL files we might introduce? On the other hand, pytest
plugins iterate over all files, independent of their intended usage in
the Python code.
* This also makes the integration with .sqlfluffignore a breeze since we simply
compare file paths.
* Ultimately, the only advantages that specialised tests would have is to
not read SQL files multiple times (irrelevant on modern machines) and nice
test names that we can target with "-k database.table", but this can be
achieved using this plugin and pytest hooks as well.
* Python tests have a specialised error reporting and error presentation tied
to Python. Plugins, on the other hand, can define their own representation
of issues.
* If we feel like it, we can publish this plugin to Pypi.
The code is inspired by pytest-flake8, which is probably copy&pasted from other
existing plugins.
"""
import hashlib
import json
from copy import deepcopy
from dataclasses import dataclass
from operator import attrgetter
from pathlib import Path
from typing import Any
import pytest
from _pytest._code import ExceptionInfo
from _pytest._code.code import TerminalRepr
from sqlfluff import __version__ as sqlfluff_version
from sqlfluff.core import FluffConfig, Linter
from sqlfluff.core.linter import LintedFile
from sqlfluff.core.linter.discovery import paths_from_path
from matviews.hashing import sha256sum_file
@dataclass
class RunContext:
linter: Linter
paths: list[str]
config_hash: str
CONTEXT_KEY = pytest.StashKey[RunContext]()
class SqlFluffError(Exception):
""" indicates an error during SQLFluff checks. """
def __init__(self, result: LintedFile) -> None:
self.result = result
def __str__(self) -> str:
tmpl = '{start_line_no:>4}:{start_line_pos:>3}:{code}: {description}'
msg = '\n'.join(tmpl.format(**x.to_dict())
for x in sorted(
self.result.get_violations(), key=attrgetter('line_no'))
)
if fixable := len(self.result.get_violations(fixable=True)):
msg = (
f'{msg}\n\nThere are {fixable} automatically fixable issues. '
f'Run following command to try to fix them:\n '
f'docker compose run --entrypoint sh test -c "'
f'sqlfluff fix {self.result.path}"')
return msg
class SqlFluffFile(pytest.File):
def __init__(
self, linter: Linter, config_hash: str, *args: Any, **kwargs: Any,
) -> None:
self.linter = linter
self.config_hash = config_hash
super(SqlFluffFile, self).__init__(*args, **kwargs)
def collect(self) -> list['SqlFluffItem']:
return [SqlFluffItem.from_parent(
self, name='sqlfluff', linter=self.linter,
config_hash=self.config_hash)]
class SqlFluffItem(pytest.Item):
def __init__(
self, linter: Linter, config_hash: str, *args: Any, **kwargs: Any,
) -> None:
self.linter = linter
self.config_hash = config_hash
super().__init__(*args, **kwargs)
self.add_marker('sqlfluff')
self._file_hash: str | None = None
@property
def cache_key(self) -> str:
return f'sqlfluff/{self._file_hash}'
def setup(self) -> None:
if self.config.cache:
self._file_hash = sha256sum_file(self.path)
if self.config.cache.get(self.cache_key, '') == self.config_hash:
pytest.skip('file(s) previously passed SQLFLUFF checks')
def runtest(self) -> None:
result = self.linter.lint_string(
self.path.read_text(), fname=str(self.path))
if result.get_violations():
raise SqlFluffError(result)
# update file hash only if test passed
# otherwise failures would not be re-run next time
if self.config.cache:
self.config.cache.set(self.cache_key, self.config_hash)
def repr_failure(
self,
excinfo: ExceptionInfo[BaseException],
style: Any = None,
) -> str | TerminalRepr:
if excinfo.errisinstance(SqlFluffError):
return str(excinfo.value)
return super(SqlFluffItem, self).repr_failure(excinfo)
def reportinfo(self) -> tuple[Path, int | None, str]:
""" File path, line number and name of the test """
return self.path, None, self.name
def hash_config(config: FluffConfig) -> str:
config_values = deepcopy(config._configs)
# delete unhashable types
del config_values['core']['dialect_obj']
del config_values['core']['templater_obj']
config_values['__version__'] = sqlfluff_version
config_str = json.dumps(config_values)
return hashlib.sha256(config_str.encode('utf-8')).hexdigest()
def pytest_addoption(parser: pytest.Parser) -> None:
group = parser.getgroup('general')
group.addoption(
'--sqlfluff', action='store_true',
help='perform some sqlfluff sanity checks on .sql files')
def pytest_configure(config: pytest.Config) -> None:
if config.option.sqlfluff:
linter = Linter(config=FluffConfig.from_root())
config.stash[CONTEXT_KEY] = RunContext(
linter=linter,
paths=paths_from_path(
str(config.rootpath),
target_file_exts=linter.config.get(
'sql_file_exts', default='.sql').lower().split(','),
),
config_hash=hash_config(linter.config),
)
config.addinivalue_line(
'markers', 'sqlfluff: Tests which run sqlfluff.')
def pytest_collect_file(
file_path: Path, parent: pytest.Collector,
) -> SqlFluffFile | None:
""" Filter files down to which ones should be checked. """
if parent.config.option.sqlfluff:
context = parent.config.stash[CONTEXT_KEY]
if str(file_path) in context.paths:
return SqlFluffFile.from_parent(
parent, path=file_path, linter=context.linter,
config_hash=context.config_hash)
return None