Работа с API Twitter: использование стримов в реальном времени
() translation by (you can also view the original English article)



Хотя REST API Twitter подходит для многих приложений, если вам нужны мгновенные обновления и доступ к более широкому набору уведомлений, то необходим API Twitter Streaming. Например, только потоковый API скажет вам, когда другой пользователь будет использовать один из ваших твитов.
Использование Streaming API требует постоянного, постоянного подключения между вашим веб-сервером и Twitter. Этот тип реализации может быть незнакомым многим разработчикам PHP. Как только появятся твиты, Twitter уведомляет ваш сервер в режиме реального времени, позволяя вам хранить их в своей базе данных без задержки и опроса REST API. Использование Streaming API также не зависит от ограничений по скорости API в Twitter.
Вот как это работает:



Существует три варианта API-интерфейсов Twitter Streaming:
- Публичный стрим. Он позволяет вашему приложению отслеживать публичные данные в Twitter, например, публичные твиты, фильтры хештэга и т.д.
- Пользовательский стрим. Он позволяет отслеживать поток твитов пользователя в режиме реального времени. Третья часть этой серии будет посвящена пользовательскому потоку.
- Потоки сайтов. Потоки сайтов позволяют вашему приложению отслеживать каналы Twitter в режиме реального времени для большого числа пользователей.
Задача вашей потоковой реализации заключается в том, чтобы как можно быстрее регистрировать входящие события и обрабатывать их в фоновом режиме с использованием REST API по мере необходимости для сбора более детальных данных. Потоки сайтов требуют предварительного одобрения от Twitter, которые, скорее всего, зарезервированы для крупных компаний и разработчиков.
К счастью, есть бесплатная библиотека с открытым исходным кодом Phirehose, которая реализует большинство требований к потоковым API. В этом руководстве будет описано, как интегрировать Phirehose в наше приложение с открытым исходным кодом Birdcage.
Библиотека Phirehose
Phirehose - это просто фантастическая PHP реализация для требований Stream Stream API, написанная Fenn Bailey. Как он сам описывает ее, Phirehose:
- обеспечивают простой интерфейс Streaming API для PHP-приложений
- выполняет рекомендации по потоковым API для обработки ошибок, повторного подключения и т. д.
- направляет клиентов к правильному управлению потоковыми API
- работает независимо от расширений PHP (то есть разделяемой памяти, PCNTL и т. д.),
Я нашел, что библиотека работает совершенно безупречно. Здесь вы найдёте больше документации по Phirehose.
Он предназначен для поддержания связи с Twitter и ответа на фид данных Twitter при неограниченном использовании и без перерывов. Он не предназначен для обработки детальный обработки твитов и гидратации данных, о которых мы говорили во второй части этой серии. Это можно сделать отдельно.
Запуск Phirehose
Как правило, вы не можете запускать типичную задачу cron в Интернете как неопределенную операцию keep-alive. Лучше создать демона командной строки.
Одной из мощных возможностей Yii является возможность запуска консольных приложений из командной строки. Это позволит нам запустить приложение командной строки keep-alive
Создание консольной команды Yii
В каталоге /app/
, за пределами веб-доступного корня, мы добавим файл stream.php
, который запускает нашу консольную консольную команду Phirehose:
1 |
<?php
|
2 |
defined('YII_DEBUG') or define('YII_DEBUG',true); |
3 |
$yii=dirname(__FILE__).'/../framework/yii.php'; |
4 |
$config=dirname(__FILE__).'/protected/config/main.php'; |
5 |
require_once($yii); |
6 |
Yii::createConsoleApplication($config)->run(); |
Затем мы создадим сам командный файл StreamCommand.php
в каталоге /app/protected/commands
:
1 |
<?php
|
2 |
class StreamCommand extends CConsoleCommand |
3 |
{
|
4 |
// test with php ./app/stream.php Stream
|
5 |
public function run($args) |
6 |
{
|
7 |
// get Twitter user account keys
|
8 |
$result = Account::model()->findByPk(1); |
9 |
$c = new Consumer($result['oauth_token'],$result['oauth_token_secret'],Phirehose::METHOD_USER); |
10 |
// load Twitter App keys
|
11 |
$app = UserSetting::model()->loadPrimarySettings(); |
12 |
$c->consumerKey = $app['twitter_key']; |
13 |
$c->consumerSecret = $app['twitter_secret']; |
14 |
$c->consume(); |
15 |
}
|
16 |
}
|
Он запустит процесс Phirehose, Consumer, используя наше приложение Twitter и пользовательские ключи.
Примечание. Для примера потоковой записи Birdcage мы предполагаем, что зарегистрирована только одна учетная запись Twitter и жестко прописываем свои учетные данные, например. account_id = 1.
Интеграция с Phirehose
Чтобы интегрировать Phirehose в Birdcage, я переместил OAuthPhirehose.php
и UserstreamPhirehose.php
в каталог /app /protected/components
. В моем файле конфигурации main.php
я добавил phirehose
в список загруженных компонентов:
1 |
'preload'=>array( |
2 |
'log', |
3 |
'bootstrap', |
4 |
'mailgun', |
5 |
'phirehose', |
6 |
'advanced'
|
7 |
),
|
Затем я создал миграцию базы данных, чтобы создать таблицу для хранения необработанных данных из потока Twitter:
1 |
class m140919_193106_create_stream_table extends CDbMigration |
2 |
{
|
3 |
protected $MySqlOptions = 'ENGINE=InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci'; |
4 |
public $tablePrefix; |
5 |
public $tableName; |
6 |
|
7 |
public function before() { |
8 |
$this->tablePrefix = Yii::app()->getDb()->tablePrefix; |
9 |
if ($this->tablePrefix <> '') |
10 |
$this->tableName = $this->tablePrefix.'stream'; |
11 |
}
|
12 |
|
13 |
public function safeUp() |
14 |
{
|
15 |
$this->before(); |
16 |
$this->createTable($this->tableName, array( |
17 |
'id' => 'pk', |
18 |
'tweet_id' => 'bigint(20) unsigned NOT NULL', |
19 |
'code' => 'text NULL', |
20 |
'is_processed' => 'tinyint default 0', |
21 |
'created_at' => 'DATETIME NOT NULL DEFAULT 0', |
22 |
'modified_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP', |
23 |
), $this->MySqlOptions); |
24 |
$this->createIndex('tweet_id', $this->tableName , 'tweet_id', true); |
25 |
|
26 |
}
|
Я также создал новую модель под названием Consumer.php
, которая расширяет OauthPhirehose
с помощью необходимого метода enqueueStatus
.
Мы хотим минимизировать объем обработки, которую необходимо выполнить в режиме реального времени. По сути, мы просто хотим записать данные, полученные из Twitter в нашу базу данных, и ничего больше. Мы можем выполнять остальную обработку в наших собственных фоновых задачах, не замедляя потоковое соединение Phirehose. Моя функция просто берет данные твитов из входящего потока и сохраняет их в таблице потоков:
1 |
<?php |
2 |
class Consumer extends OauthPhirehose |
3 |
{ |
4 |
// This function is called automatically by the Phirehose class |
5 |
// when a new tweet is received with the JSON data in $status |
6 |
public function enqueueStatus($status) { |
7 |
$stream_item = json_decode($status); |
8 |
if (!(isset($stream_item->id_str))) { return;} |
9 |
$s = new Stream; |
10 |
$s->tweet_id = $stream_item->id_str; |
11 |
$s->code = base64_encode(serialize($stream_item)); |
12 |
$s->is_processed=0; |
13 |
$s->created_at = new CDbExpression('NOW()'); |
14 |
$s->modified_at =new CDbExpression('NOW()'); |
15 |
$s->save(); |
16 |
var_dump($stream_item); |
17 |
} |
18 |
} |
19 |
?> |
Мы будем полагаться на фоновые задачи, выполняемые нашим DaemonController
для обработки данных в модели Bircdcage Tweet. Это описано ниже.
Активирование Phirehose
Вы можете протестировать Phirehose с помощью консольной PHP команды:
php ./app/stream.php Stream
Twitter отправит поток информации о последователях для учетной записи пользователя, а затем данные в реальном времени по мере поступления.
Чтобы активировать Phirehose как keep-alive, всегда включенную консольную команду, мы будем использовать команду nohup, например no hangup и перенаправляя вывод на dev/null:
nohup php ./app/stream.php Stream> / dev / null 2> & 1 &
Ubuntu ответит идентификатором задания вашего процесса для будущего его мониторинга и завершения:
[1] 9768
Если вы хотите проверить, что процесс запущен, просканируйте список задач для идентификатора задания:
ps -e все | grep 9768
Вы должны увидеть что-то вроде этого:
0 1000 9768 9743 20 0 273112 16916 poll_s S pts / 0 0:00 php ./app/stream.php Stream
И вы можете завершить Phirehose, убив id задания:
kill 9768
По моему собственному опыту, Phirehose безупречно работал с этой техникой.
Обработка потоковых данных
Нам также необходимо создать фоновый процесс в Birdcage, который будет обрабатывать потоковые данные в наших таблицах Twitter, Mention, URL и Hashtag, как если бы они появились в REST API.
Измените настройки файла twitter.ini
для использования потоков:
1 |
twitter_stream = true |
И мы можем использовать одно и то же задание cron из второй части для выполнения этой операции:
1 |
# To define the time you can provide concrete values for
|
2 |
# minute (m), hour (h), day of month (dom), month (mon),
|
3 |
# and day of week (dow) or use '*' in these fields (for 'any').#
|
4 |
# Notice that tasks will be started based on the cron's system
|
5 |
# daemon's notion of time and timezones.
|
6 |
#
|
7 |
# For example, you can run a backup of all your user accounts
|
8 |
# at 5 a.m every week with:
|
9 |
# 0 5 * * 1 tar -zcf /var/backups/home.tgz /home/
|
10 |
#
|
11 |
# m h dom mon dow command
|
12 |
*/5 * * * * wget -O /dev/null http://birdcage.yourdomain.com/daemon/index |
Затем, когда вызывается DaemonController, он активирует метод Stream model()
:
1 |
public function actionIndex() { |
2 |
// if not using twitter streams, we'll process tweets by REST API
|
3 |
if (!Yii::app()->params['twitter_stream']) { |
4 |
Tweet::model()->getStreams(); |
5 |
} else { |
6 |
Stream::model()->process(); |
7 |
}
|
8 |
}
|
Метод процесса распаковывает данные закодированного потока и анализирует каждую запись так же, как мы сделали с ответом из REST API:
1 |
public function process() { |
2 |
// get unprocessed tweets from stream engine
|
3 |
// to do
|
4 |
$account_id = 1; |
5 |
$items = Stream::model()->unprocessed()->findAll(); |
6 |
foreach ($items as $i) { |
7 |
$tweet = unserialize(base64_decode($i['code'])); |
8 |
Tweet::model()->parse($account_id,$tweet); |
9 |
$this->setStatus($i['id'],self::STREAM_PROCESSED); |
10 |
}
|
11 |
}
|
Birdcage в настоящее время игнорирует данные из потока, который не является твитом, например. уведомления, прямые сообщения и т. д. Я оставлю это вам для расширения - или вы можете проверить мое расширенное приложение Birdhouse.
В заключение
Надеюсь, вы нашли эту серию из трех статей о Twitter API полезной и интересной. К настоящему времени вы узнали об OAuth, REST API, Streaming API, создании базы данных для Twitter, обработке временной шкалы с использованием обоих типов API-интерфейсов, правильном подсчете символов в твитах и т.д.
Пожалуйста, размещайте любые комментарии, исправления или дополнительные идеи ниже. Вы можете найти другие мои уроки Tuts + на моей странице автора или подписаться на меня в Twitter @reifman.