Streaming Connections

class sitebucket.listener.SiteStream(follow, consumer, token, stream_with='user', parser=<sitebucket.parser.DefaultParser object at 0x1021b6e90>)

The SiteStream object establishes an authenticated connection via OAuth to Twitter’s site streams endpoint and parses the returned data.

Keyword arguments:

  • follow – a list of users that have authenticated your app to follow
  • stream_with – ‘user’ or ‘followings’. A value of ‘user’ will cause the stream to only return data about actions the users specified in follow take. A value of ‘followings’ will cause the Stream object to return data about the user’s followings (basically their home timeline). This defaults to ‘user’.
  • consumer – a python-oauth2 Consumer object for the app
  • token – a python-oauth2 Token object for the app’s owner account.
  • parser – an object that extends BaseParser that will handle data returned by the stream.

To use, first import SiteStream and oauth2:

>>> from sitebucket import SiteStream
>>> import oauth2

Then generate your Consumer and Token objects:

>>> token = oauth2.Token('key', 'secret')
>>> consumer = oauth2.Consumer('key', 'secret')

And then instantiate your SiteStream object:

>>> stream = SiteStream([1,2,3], consumer, token)

Repeatedly attempts to connect to the streaming server until either the failure conditions are met or a successful connection is made. Returns None (in the event of a disconnect request or error) or an httplib response object.

>>> stream.connect() 

This method sets flags that will cause the stream processing loops to terminate. However, since the listen method blocks, this method will need to be called from another thread to terminate the connection if listen is still running.

>>> stream.disconnect()
>>> stream.disconnect_issued
>>> stream.running

listen initiates a streaming connection via the connect method and processes data from Twitter as it is received.

Important: This method blocks until the connection fails or another thread sets flags that causes the read loop to terminate.

>>> stream.listen() 

When a complete message is received from the stream (json terminated by \r\n), on_receive passes the data to the parser object’s parse method and clears the buffer.

If we have a custom parser defined like so:

>>> class myparser(BaseParser):
...     def parse(self, data):
...         print data.strip()

and we tell our stream object to use this parser, it will simply print the data anytime it is received from the stream.

>>> stream.parser = myparser()
>>> stream.on_receive("{'some':'json'}\r\n")

Returns an oauth2 request object to be used for connecting to the streaming API endpoint. For debugging purposes, the returned request object is cached in self._last_request.

>>> req = stream.request
>>> print req.to_url()

Resets all stream throttles, flags, and buffers to their default value. This can be used to prepare a SiteStream object for a reconnection attempt or to clear error states after a successful connection is made.

>>> stream.error_count = RETRY_LIMIT # Error count at limit
>>> stream.retry_time = 10000.0 # really high retry time
>>> stream.buffer = 'leftover input' # buffer with unfinished input
>>> stream.reset_throttles() # reset!
>>> stream.retry_limit == RETRY_LIMIT
>>> stream.retry_time == RETRY_TIME
>>> stream.error_count == 0
>>> stream.timeout == TIMEOUT
>>> stream.buffer == ''

Returns True or False based on whether or not this SiteStream object has exceeded re-connection limits.

>>> stream.retry_ok
>>> stream.error_count = stream.retry_limit # Raise error count to max
>>> stream.retry_ok
sleep(stime=None, update_error_count=True, close_connection=True)

Causes the the thread to sleep for the amount of time specified in the stream’s retry_time property. The retry_time is then squared. This method may optionally increment the error_count property and terminate the connection.

  • stime – sleep time that overrides the object’s retry_time property. If this is set, retry_time is not squared.
  • update_error_count – if this is true, the method increments the error_count property
  • close_connection – if this is true, the method terminates the connection.
>>> old_error_count = stream.error_count
>>> stream.sleep(0)
>>> stream.error_count > old_error_count

Returns the URL based on PROTOCOL, SITE_STREAM_HOST, and URI.

>>> stream = SiteStream([1,2], consumer, token)
>>> stream.url

Spawning Streaming Connections in new Threads

class sitebucket.thread.ListenThread(stream, *args, **kwargs)

ListenThread is a thread object wrapper for listener.SiteStream objects. Instantiate a ListenThread instance with a required SiteStream object and then invoke the ListenThread’s start method to connect to the stream and parse the output in a separate thread.

  • stream – an instance of listener.SiteStream
>>> thread = ListenThread(stream)
>>> stream ==

Sets flags in the thread’s stream object that will cause its reading and connection loops to exit. This will eventually cause the thread to exit. It can be restarted via the object’s restart method.

>>> thread = ListenThread(stream)
>>> thread.close()

Returns true if the thread’s stream object has not yet initialized its connection, its connection is healthy, or it is still attempting to establish a connection. False if connecting failed and all allotted retry attempts have been exhausted.

An uninitialized connection:

>>> thread = ListenThread(stream)
>>> thread.connection_healthy

A stream that has failed:

>>> thread = ListenThread(failed_stream)
>>> thread.connection_healthy

Restart resets the stream object’s‘ failure flags and creates a new ListenThread object with the dead thread’s stream object. Use this method to retry a connection if the connection_healthy property starts to return False.

>>> thread = ListenThread(failed_stream)
>>> thread.restart() 
<ListenThread(..., ...)>

This invokes the stream object’s listen method in the current thread. Use the object’s start method to start a new thread.

>>> thread = ListenThread(stream)

Table Of Contents

Previous topic

Sitebucket: Twitter Site Streams Consumer

Next topic

Monitoring Streams and Automatically Reconnecting

This Page