Skip to content

Commit 8b7be85

Browse files
committed
Upgrade to new airflow.
1 parent 8481c58 commit 8b7be85

2 files changed

Lines changed: 19 additions & 22 deletions

File tree

airflow_plugins/operators/sensors/file_sensor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ def poke(self, context):
196196

197197
def get_last_modified(fileobj):
198198
timestamp = fileobj.last_modified
199+
200+
if isinstance(timestamp, datetime):
201+
return timestamp
202+
199203
tformat = '%a, %d %b %Y %H:%M:%S %Z'
200204
dt = datetime.strptime(timestamp, tformat)
201205
t = time.strptime(timestamp, tformat)

tests/operators/sensors/test_file_sensor.py

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from datetime import datetime, timedelta
22
from time import sleep
33

4-
import boto
4+
import boto3
55
import pytest
66
from airflow.exceptions import AirflowException
7-
from boto.s3.key import Key
7+
# from boto3.s3.key import Key
88
from mock import Mock
99
from moto import mock_s3
1010

@@ -46,13 +46,14 @@ def test_files_sensor_fail_on_unsupported_connection():
4646

4747
@mock_s3
4848
def test_files_on_s3():
49-
conn = boto.connect_s3()
50-
bucket = conn.create_bucket('storiesbi-datapipeline')
49+
conn = boto3.resource('s3')
50+
conn.create_bucket(Bucket='storiesbi-datapipeline')
51+
5152
get_or_update_conn("s3.stories.bi", conn_type="s3")
5253

5354
file_sensor = FileSensor(
5455
task_id="check_new_file",
55-
path="foo",
56+
path="ss://storiesbi-datapipeline/foo",
5657
conn_id="s3.stories.bi",
5758
modified="anytime"
5859
)
@@ -61,27 +62,22 @@ def test_files_on_s3():
6162

6263
assert not file_sensor.poke(ctx)
6364

64-
k = Key(bucket)
65-
k.key = "foo"
66-
k.set_contents_from_string("bar")
65+
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")
6766

6867
assert file_sensor.poke(ctx)
6968

7069

7170
@mock_s3
7271
def test_files_on_s3_modified_after():
73-
conn = boto.connect_s3()
74-
bucket = conn.create_bucket('storiesbi-datapipeline')
75-
76-
k = Key(bucket)
77-
k.key = "foo"
78-
k.set_contents_from_string("bar")
72+
conn = boto3.resource('s3')
73+
conn.create_bucket(Bucket='storiesbi-datapipeline')
74+
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")
7975

8076
get_or_update_conn("s3.stories.bi", conn_type="s3")
8177

8278
file_sensor = FileSensor(
8379
task_id="check_new_file",
84-
path="foo",
80+
path="s3://storiesbi-datapipeline/foo",
8581
conn_id="s3.stories.bi",
8682
modified=datetime.now()
8783
)
@@ -92,19 +88,16 @@ def test_files_on_s3_modified_after():
9288

9389
# Hacky hacky!
9490
sleep(1)
95-
key = bucket.get_key("foo")
96-
key.set_contents_from_string("baz")
91+
conn.Object('storiesbi-datapipeline', 'foo').put(Body="baz")
9792

9893
assert file_sensor.poke(ctx)
9994

10095

10196
@mock_s3
10297
def test_files_on_s3_from_custom_bucket_defined_in_path():
103-
conn = boto.connect_s3()
104-
bucket = conn.create_bucket('testing')
105-
k = Key(bucket)
106-
k.key = "foo"
107-
k.set_contents_from_string("baz")
98+
conn = boto3.resource('s3')
99+
conn.create_bucket(Bucket='testing')
100+
conn.Object('testing', 'foo').put(Body="baz")
108101

109102
get_or_update_conn("s3.stories.bi", conn_type="s3")
110103
yesterday = datetime.now() - timedelta(1)

0 commit comments

Comments
 (0)