Skip to content

Commit 97074bf

Browse files
authored
Merge pull request #33 from theafolayan/theafolayan/create-twitter-polling-jobs-and-dm-service
Implement Twitter follower polling and DM automation
2 parents 7043544 + d865650 commit 97074bf

18 files changed

Lines changed: 919 additions & 2 deletions

app/Console/Kernel.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,30 @@
44

55
namespace App\Console;
66

7+
use App\Jobs\Twitter\DispatchFollowerPolling;
78
use Illuminate\Console\Scheduling\Schedule;
89
use Illuminate\Foundation\Console\Kernel as ConsoleKernel;
910

1011
class Kernel extends ConsoleKernel
1112
{
1213
protected function schedule(Schedule $schedule): void
1314
{
14-
//
15+
$connection = config('twitter.queues.connection', 'redis');
16+
$queue = config('twitter.queues.poll_queue', 'twitter-polling');
17+
18+
foreach (config('twitter.polling.plans', []) as $plan => $interval) {
19+
$interval = (int) $interval;
20+
if ($interval <= 0) {
21+
continue;
22+
}
23+
24+
$schedule->job(new DispatchFollowerPolling($plan))
25+
->cron(sprintf('*/%d * * * *', $interval))
26+
->onConnection($connection)
27+
->onQueue($queue)
28+
->withoutOverlapping()
29+
->name(sprintf('twitter-followers-polling-%s', strtolower($plan)));
30+
}
1531
}
1632

1733
protected function commands(): void
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Jobs\Twitter;
6+
7+
use App\Models\TwitterAccount;
8+
use Illuminate\Bus\Queueable;
9+
use Illuminate\Contracts\Queue\ShouldQueue;
10+
use Illuminate\Foundation\Bus\Dispatchable;
11+
use Illuminate\Queue\InteractsWithQueue;
12+
use Illuminate\Queue\SerializesModels;
13+
use Illuminate\Support\Facades\Log;
14+
15+
class DispatchFollowerPolling implements ShouldQueue
16+
{
17+
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
18+
19+
public function __construct(public string $planName)
20+
{
21+
$this->connection = config('twitter.queues.connection', 'redis');
22+
$this->queue = config('twitter.queues.poll_queue', 'twitter-polling');
23+
$this->onConnection($this->connection);
24+
$this->onQueue($this->queue);
25+
}
26+
27+
public function handle(): void
28+
{
29+
$accounts = TwitterAccount::query()
30+
->whereNull('disconnected_at')
31+
->whereHas('workspace.account.plan', function ($query): void {
32+
$query->where('name', $this->planName);
33+
})
34+
->get();
35+
36+
if ($accounts->isEmpty()) {
37+
return;
38+
}
39+
40+
foreach ($accounts as $account) {
41+
if (empty($account->access_token)) {
42+
Log::warning('Skipping Twitter follower polling for account without access token', [
43+
'twitter_account_id' => $account->id,
44+
]);
45+
continue;
46+
}
47+
48+
PollFollowers::dispatch($account)->onConnection($this->connection)->onQueue($this->queue);
49+
}
50+
}
51+
}

app/Jobs/Twitter/PollFollowers.php

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Jobs\Twitter;
6+
7+
use App\Models\TwitterAccount;
8+
use App\Models\TwitterFollower;
9+
use Illuminate\Bus\Queueable;
10+
use Illuminate\Contracts\Queue\ShouldQueue;
11+
use Illuminate\Foundation\Bus\Dispatchable;
12+
use Illuminate\Queue\InteractsWithQueue;
13+
use Illuminate\Queue\SerializesModels;
14+
use Illuminate\Support\Carbon;
15+
use Illuminate\Support\Facades\Http;
16+
use Illuminate\Support\Facades\Log;
17+
18+
class PollFollowers implements ShouldQueue
19+
{
20+
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
21+
22+
public function __construct(public TwitterAccount $twitterAccount)
23+
{
24+
$this->connection = config('twitter.queues.connection', 'redis');
25+
$this->queue = config('twitter.queues.poll_queue', 'twitter-polling');
26+
$this->onConnection($this->connection);
27+
$this->onQueue($this->queue);
28+
}
29+
30+
public function handle(): void
31+
{
32+
$account = $this->twitterAccount->fresh();
33+
if ($account === null || $account->disconnected_at !== null) {
34+
return;
35+
}
36+
37+
$token = $account->access_token;
38+
if ($token === null) {
39+
Log::warning('Twitter polling skipped due to missing access token', [
40+
'twitter_account_id' => $account->id,
41+
]);
42+
return;
43+
}
44+
45+
$url = sprintf('%s/2/users/%s/followers', config('twitter.base_url'), $account->twitter_id);
46+
$paginationToken = null;
47+
$welcomeMessage = config('twitter.dm.welcome_message');
48+
$queueConnection = config('twitter.queues.connection', 'redis');
49+
$dmQueue = config('twitter.queues.dm_queue', 'twitter-dm');
50+
51+
do {
52+
$query = [
53+
'max_results' => config('twitter.polling.page_size'),
54+
'user.fields' => 'id,name,username',
55+
];
56+
57+
if ($paginationToken !== null) {
58+
$query['pagination_token'] = $paginationToken;
59+
}
60+
61+
$response = Http::withToken($token)
62+
->acceptJson()
63+
->get($url, $query);
64+
65+
if ($response->failed()) {
66+
Log::error('Twitter followers API request failed', [
67+
'twitter_account_id' => $account->id,
68+
'status' => $response->status(),
69+
'body' => $response->json(),
70+
]);
71+
72+
return;
73+
}
74+
75+
$followers = $response->json('data', []);
76+
if (! is_array($followers)) {
77+
Log::warning('Twitter followers API returned unexpected payload', [
78+
'twitter_account_id' => $account->id,
79+
]);
80+
81+
return;
82+
}
83+
84+
foreach ($followers as $follower) {
85+
$followerId = $follower['id'] ?? null;
86+
if ($followerId === null) {
87+
continue;
88+
}
89+
90+
$record = TwitterFollower::query()->firstOrCreate(
91+
[
92+
'twitter_account_id' => $account->id,
93+
'follower_id' => $followerId,
94+
],
95+
[
96+
'username' => $follower['username'] ?? null,
97+
'name' => $follower['name'] ?? null,
98+
'seen_at' => Carbon::now(),
99+
],
100+
);
101+
102+
if (! $record->wasRecentlyCreated) {
103+
continue;
104+
}
105+
106+
SendDirectMessage::dispatch(
107+
$account,
108+
$followerId,
109+
$welcomeMessage,
110+
)->onConnection($queueConnection)
111+
->onQueue($dmQueue);
112+
}
113+
114+
$paginationToken = $response->json('meta.next_token');
115+
} while ($paginationToken !== null);
116+
117+
$account->forceFill([
118+
'last_polled_at' => Carbon::now(),
119+
])->save();
120+
}
121+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Jobs\Twitter;
6+
7+
use App\Models\TwitterAccount;
8+
use App\Services\Twitter\DirectMessageSender;
9+
use App\Services\Twitter\Exceptions\RateLimitException;
10+
use Illuminate\Bus\Queueable;
11+
use Illuminate\Contracts\Queue\ShouldQueue;
12+
use Illuminate\Foundation\Bus\Dispatchable;
13+
use Illuminate\Queue\InteractsWithQueue;
14+
use Illuminate\Queue\SerializesModels;
15+
use Illuminate\Support\Facades\Log;
16+
17+
class SendDirectMessage implements ShouldQueue
18+
{
19+
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
20+
21+
public function __construct(
22+
public TwitterAccount $twitterAccount,
23+
public string $participantId,
24+
public string $message,
25+
) {
26+
$this->connection = config('twitter.queues.connection', 'redis');
27+
$this->queue = config('twitter.queues.dm_queue', 'twitter-dm');
28+
$this->onConnection($this->connection);
29+
$this->onQueue($this->queue);
30+
}
31+
32+
public function handle(DirectMessageSender $sender): void
33+
{
34+
$account = $this->twitterAccount->fresh();
35+
if ($account === null || $account->disconnected_at !== null) {
36+
return;
37+
}
38+
39+
try {
40+
$sender->send($account, $this->participantId, $this->message);
41+
} catch (RateLimitException $exception) {
42+
$delay = max(1, $exception->retryAfter ?? config('twitter.dm.backoff.default_retry_after'));
43+
$this->release($delay);
44+
} catch (\Throwable $exception) {
45+
Log::error('Failed to send Twitter direct message', [
46+
'twitter_account_id' => $account->id,
47+
'participant_id' => $this->participantId,
48+
'error' => $exception->getMessage(),
49+
]);
50+
throw $exception;
51+
}
52+
}
53+
}

app/Models/TwitterAccount.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Illuminate\Database\Eloquent\Factories\HasFactory;
1010
use Illuminate\Database\Eloquent\Model;
1111
use Illuminate\Database\Eloquent\Relations\BelongsTo;
12+
use Illuminate\Database\Eloquent\Relations\HasMany;
1213

1314
class TwitterAccount extends Model
1415
{
@@ -21,6 +22,7 @@ class TwitterAccount extends Model
2122
'token_expires_at' => 'datetime',
2223
'connected_at' => 'datetime',
2324
'disconnected_at' => 'datetime',
25+
'last_polled_at' => 'datetime',
2426
];
2527

2628
protected $hidden = [
@@ -43,6 +45,11 @@ public function user(): BelongsTo
4345
return $this->belongsTo(User::class);
4446
}
4547

48+
public function followers(): HasMany
49+
{
50+
return $this->hasMany(TwitterFollower::class);
51+
}
52+
4653
protected function accessToken(): Attribute
4754
{
4855
return Attribute::make(

app/Models/TwitterFollower.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Models;
6+
7+
use Illuminate\Database\Eloquent\Factories\HasFactory;
8+
use Illuminate\Database\Eloquent\Model;
9+
use Illuminate\Database\Eloquent\Relations\BelongsTo;
10+
11+
class TwitterFollower extends Model
12+
{
13+
use HasFactory;
14+
15+
protected $guarded = [];
16+
17+
protected $casts = [
18+
'seen_at' => 'datetime',
19+
];
20+
21+
public function account(): BelongsTo
22+
{
23+
return $this->belongsTo(TwitterAccount::class, 'twitter_account_id');
24+
}
25+
}

app/Services/RateLimiter/TokenBucketLimiter.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ public function __construct($client = null)
2020

2121
public function consume(int $workspaceId, int $perMinute, int $tokens = 1): bool
2222
{
23-
$key = "rate:" . $workspaceId;
23+
return $this->consumeKey("rate:" . $workspaceId, $perMinute, $tokens);
24+
}
25+
26+
public function consumeKey(string $key, int $perMinute, int $tokens = 1): bool
27+
{
2428
$now = microtime(true);
2529

2630
try {

0 commit comments

Comments
 (0)