ストリーミング

Stream は Twitter API を用いることでリアルタイムに ツイートの filteringsampling を可能にします.

ストリームは Streaming HTTP プロトコルを利用して オープンでストリーミングな API 接続を経由してデータを配信します. REST API のようにクライアントアプリケーションから 繰り返しリクエストすることで一括でデータを配信するのではなく, アプリケーションと API の間に単一の接続がなされ, 新しく条件のマッチが起こるたびに新しい結果が配信されます. この結果, 非常に高いスループットをサポートする低レイテンシな 配信メカニズムが可能となります. さらに詳しくは, 以下を参照してください. https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data

使い方 Stream

Stream を使うには, Twitter API の認証情報 (Consumer key, Consumer Secret, アクセストークン, アクセストークンシークレット) を用いて インスタンスを初期化する必要があります.

import tweepy

stream = tweepy.Stream(
  "Consumer Key here", "Consumer Secret here",
  "Access Token here", "Access Token Secret here"
)

そして, Stream.filter() または Stream.sample() を使うことで, ストリームに接続して実行することができます.

stream.filter(track=["Tweepy"])

ストリームから受け取ったデータは, Stream.on_data() に渡されます. このメソッドはメッセージタイプに基づいて, 他のメソッドにデータを送る処理をします. 例えば, ストリームからツイートを受け取ると, 生データは Stream.on_data() に送られ, Status オブジェクトを構築し, Stream.on_status() に渡します. デフォルトでは Stream.on_data() 以外のストリームからデータを受け取るメソッドは 受信したデータを単にログに記録するだけで, logging level は データタイプに依存しています.

To customize the processing of the stream data, Stream needs to be subclassed. For example, to print the IDs of every Tweet received:

class IDPrinter(tweepy.Stream):

    def on_status(self, status):
        print(status.id)


printer = IDPrinter(
  "Consumer Key here", "Consumer Secret here",
  "Access Token here", "Access Token Secret here"
)
printer.sample()

Threading

Both Stream.filter() and Stream.sample() have a threaded parameter. When set to True, the stream will run in a separate thread, which is returned by the call to either method. For example:

thread = stream.filter(follow=[1072250532645998596], threaded=True)

Handling Errors

Stream has multiple methods to handle errors during streaming. Stream.on_closed() is called when the stream is closed by Twitter. Stream.on_connection_error() is called when the stream encounters a connection error. Stream.on_request_error() is called when an error is encountered while trying to connect to the stream. When these errors are encountered and max_retries, which defaults to infinite, hasn’t been exceeded yet, the Stream instance will attempt to reconnect the stream after an appropriate amount of time. By default, all three of these methods log an error. To customize that handling, they can be overriden in a subclass:

class ConnectionTester(tweepy.Stream):

    def on_connection_error(self):
        self.disconnect()

Stream.on_request_error() is also passed the HTTP status code that was encountered. The HTTP status codes reference for the Twitter API can be found at https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.

Stream.on_exception() is called when an unhandled exception occurs. This is fatal to the stream, and by default, an exception is logged.