Skip to content

Commit cccb31b

Browse files
committed
Upgrade to new airflow.
1 parent 8481c58 commit cccb31b

2 files changed

Lines changed: 49 additions & 23 deletions

File tree

airflow_plugins/operators/sensors/file_sensor.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import time
44
from datetime import datetime, timedelta
55

6+
import pytz
67
from airflow.exceptions import (
78
AirflowException,
89
AirflowSensorTimeout,
@@ -12,6 +13,7 @@
1213
from airflow.operators.sensors import BaseSensorOperator
1314
from airflow.utils.decorators import apply_defaults
1415
from pytz import timezone
16+
from pytz.exceptions import UnknownTimeZoneError
1517

1618
from airflow_plugins.hooks import FTPHook
1719
from airflow_plugins.operators import FileOperator
@@ -66,7 +68,14 @@ def _send_notification(self, context, success=False):
6668
send_notification(ti.get_dagrun(), text, title, color)
6769
return
6870

69-
runtime = datetime.now() - ti.start_date
71+
try:
72+
tz = timezone(ti.start_date.tm_zone)
73+
except (AttributeError, UnknownTimeZoneError): # tm_zone not set on t
74+
runtime = datetime.now() - ti.start_date
75+
else:
76+
runtime = datetime.utcnow().replace(
77+
tzinfo=ti.tm_zone) - ti.start_date
78+
7079
if runtime >= self.notify_after:
7180
if (self.last_notification is None or
7281
runtime >= self.last_notification + self.notify_delta):
@@ -149,6 +158,7 @@ def execute(self, context):
149158
# notify about success in case of previous warnings
150159
self._send_notification(context, success=True)
151160
logging.info('Success criteria met. Exiting.')
161+
self._send_notification(context, success=False)
152162
return poke_result
153163

154164
def poke(self, context):
@@ -196,6 +206,10 @@ def poke(self, context):
196206

197207
def get_last_modified(fileobj):
198208
timestamp = fileobj.last_modified
209+
210+
if isinstance(timestamp, datetime):
211+
return timestamp
212+
199213
tformat = '%a, %d %b %Y %H:%M:%S %Z'
200214
dt = datetime.strptime(timestamp, tformat)
201215
t = time.strptime(timestamp, tformat)

tests/operators/sensors/test_file_sensor.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from datetime import datetime, timedelta
22
from time import sleep
33

4-
import boto
4+
import boto3
55
import pytest
66
from airflow.exceptions import AirflowException
7-
from boto.s3.key import Key
7+
# from boto3.s3.key import Key
88
from mock import Mock
99
from moto import mock_s3
1010

@@ -46,13 +46,14 @@ def test_files_sensor_fail_on_unsupported_connection():
4646

4747
@mock_s3
4848
def test_files_on_s3():
49-
conn = boto.connect_s3()
50-
bucket = conn.create_bucket('storiesbi-datapipeline')
49+
conn = boto3.resource('s3')
50+
conn.create_bucket(Bucket='storiesbi-datapipeline')
51+
5152
get_or_update_conn("s3.stories.bi", conn_type="s3")
5253

5354
file_sensor = FileSensor(
5455
task_id="check_new_file",
55-
path="foo",
56+
path="ss://storiesbi-datapipeline/foo",
5657
conn_id="s3.stories.bi",
5758
modified="anytime"
5859
)
@@ -61,27 +62,22 @@ def test_files_on_s3():
6162

6263
assert not file_sensor.poke(ctx)
6364

64-
k = Key(bucket)
65-
k.key = "foo"
66-
k.set_contents_from_string("bar")
65+
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")
6766

6867
assert file_sensor.poke(ctx)
6968

7069

7170
@mock_s3
7271
def test_files_on_s3_modified_after():
73-
conn = boto.connect_s3()
74-
bucket = conn.create_bucket('storiesbi-datapipeline')
75-
76-
k = Key(bucket)
77-
k.key = "foo"
78-
k.set_contents_from_string("bar")
72+
conn = boto3.resource('s3')
73+
conn.create_bucket(Bucket='storiesbi-datapipeline')
74+
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")
7975

8076
get_or_update_conn("s3.stories.bi", conn_type="s3")
8177

8278
file_sensor = FileSensor(
8379
task_id="check_new_file",
84-
path="foo",
80+
path="s3://storiesbi-datapipeline/foo",
8581
conn_id="s3.stories.bi",
8682
modified=datetime.now()
8783
)
@@ -92,19 +88,16 @@ def test_files_on_s3_modified_after():
9288

9389
# Hacky hacky!
9490
sleep(1)
95-
key = bucket.get_key("foo")
96-
key.set_contents_from_string("baz")
91+
conn.Object('storiesbi-datapipeline', 'foo').put(Body="baz")
9792

9893
assert file_sensor.poke(ctx)
9994

10095

10196
@mock_s3
10297
def test_files_on_s3_from_custom_bucket_defined_in_path():
103-
conn = boto.connect_s3()
104-
bucket = conn.create_bucket('testing')
105-
k = Key(bucket)
106-
k.key = "foo"
107-
k.set_contents_from_string("baz")
98+
conn = boto3.resource('s3')
99+
conn.create_bucket(Bucket='testing')
100+
conn.Object('testing', 'foo').put(Body="baz")
108101

109102
get_or_update_conn("s3.stories.bi", conn_type="s3")
110103
yesterday = datetime.now() - timedelta(1)
@@ -119,3 +112,22 @@ def test_files_on_s3_from_custom_bucket_defined_in_path():
119112
file_sensor.pre_execute(ctx)
120113

121114
assert file_sensor.poke(ctx)
115+
116+
117+
@mock_s3
118+
def test_operator_notification():
119+
conn = boto3.resource('s3')
120+
conn.create_bucket(Bucket='testing')
121+
conn.Object('testing', 'foo').put(Body="baz")
122+
123+
get_or_update_conn("s3.stories.bi", conn_type="s3")
124+
yesterday = datetime.now() - timedelta(1)
125+
126+
file_sensor = FileSensor(
127+
task_id="check_new_file",
128+
path="s3://testing/foo",
129+
conn_id="s3.stories.bi",
130+
modified=yesterday
131+
)
132+
133+
file_sensor._send_notification(ctx, success=False)

0 commit comments

Comments
 (0)