Streaming Connections

class sitebucket.listener.SiteStream(follow, consumer, token, stream_with='user', parser=<sitebucket.parser.DefaultParser object at 0x1021b6e90>)

The SiteStream object establishes an authenticated connection via OAuth to Twitter’s site streams endpoint and parses the returned data.

Keyword arguments:

  • follow – a list of users that have authenticated your app to follow
  • stream_with – ‘user’ or ‘followings’. A value of ‘user’ will cause the stream to only return data about actions the users specified in follow take. A value of ‘followings’ will cause the Stream object to return data about the user’s followings (basically their home timeline). This defaults to ‘user’.
  • consumer – a python-oauth2 Consumer object for the app
  • token – a python-oauth2 Token object for the app’s owner account.
  • parser – an object that extends BaseParser that will handle data returned by the stream.

To use, first import SiteStream and oauth2:

>>> from sitebucket import SiteStream
>>> import oauth2

Then generate your Consumer and Token objects:

>>> token = oauth2.Token('key', 'secret')
>>> consumer = oauth2.Consumer('key', 'secret')

And then instantiate your SiteStream object:

>>> stream = SiteStream([1,2,3], consumer, token)
connect()

Repeatedly attempts to connect to the streaming server until either the failure conditions are met or a successful connection is made. Returns None (in the event of a disconnect request or error) or an httplib response object.

>>> stream.connect() 
disconnect()

This method sets flags that will cause the stream processing loops to terminate. However, since the listen method blocks, this method will need to be called from another thread to terminate the connection if listen is still running.

>>> stream.disconnect()
>>> stream.disconnect_issued
True
>>> stream.running
False
listen()

listen initiates a streaming connection via the connect method and processes data from Twitter as it is received.

Important: This method blocks until the connection fails or another thread sets flags that causes the read loop to terminate.

>>> stream.listen() 
on_receive(data)

When a complete message is received from the stream (json terminated by \r\n), on_receive passes the data to the parser object’s parse method and clears the buffer.

If we have a custom parser defined like so:

>>> class myparser(BaseParser):
...     def parse(self, data):
...         print data.strip()

and we tell our stream object to use this parser, it will simply print the data anytime it is received from the stream.

>>> stream.parser = myparser()
>>> stream.on_receive("{'some':'json'}\r\n")
{'some':'json'}
request

Returns an oauth2 request object to be used for connecting to the streaming API endpoint. For debugging purposes, the returned request object is cached in self._last_request.

>>> req = stream.request
>>> print req.to_url() 
https://sitestream.twitter.com/2b/site.json?...
reset_throttles()

Resets all stream throttles, flags, and buffers to their default value. This can be used to prepare a SiteStream object for a reconnection attempt or to clear error states after a successful connection is made.

>>> stream.error_count = RETRY_LIMIT # Error count at limit
>>> stream.retry_time = 10000.0 # really high retry time
>>> stream.buffer = 'leftover input' # buffer with unfinished input
>>> stream.reset_throttles() # reset!
>>> stream.retry_limit == RETRY_LIMIT
True
>>> stream.retry_time == RETRY_TIME
True
>>> stream.error_count == 0
True
>>> stream.timeout == TIMEOUT
True
>>> stream.buffer == ''
True
retry_ok

Returns True or False based on whether or not this SiteStream object has exceeded re-connection limits.

>>> stream.retry_ok
True
>>> stream.error_count = stream.retry_limit # Raise error count to max
>>> stream.retry_ok
False
sleep(stime=None, update_error_count=True, close_connection=True)

Causes the the thread to sleep for the amount of time specified in the stream’s retry_time property. The retry_time is then squared. This method may optionally increment the error_count property and terminate the connection.

  • stime – sleep time that overrides the object’s retry_time property. If this is set, retry_time is not squared.
  • update_error_count – if this is true, the method increments the error_count property
  • close_connection – if this is true, the method terminates the connection.
>>> old_error_count = stream.error_count
>>> stream.sleep(0)
>>> stream.error_count > old_error_count
True
url

Returns the URL based on PROTOCOL, SITE_STREAM_HOST, and URI.

>>> stream = SiteStream([1,2], consumer, token)
>>> stream.url
'https://sitestream.twitter.com/2b/site.json'

Spawning Streaming Connections in new Threads

class sitebucket.thread.ListenThread(stream, *args, **kwargs)

ListenThread is a thread object wrapper for listener.SiteStream objects. Instantiate a ListenThread instance with a required SiteStream object and then invoke the ListenThread’s start method to connect to the stream and parse the output in a separate thread.

  • stream – an instance of listener.SiteStream
>>> thread = ListenThread(stream)
>>> stream == thread.stream
True
close()

Sets flags in the thread’s stream object that will cause its reading and connection loops to exit. This will eventually cause the thread to exit. It can be restarted via the object’s restart method.

>>> thread = ListenThread(stream)
>>> thread.close()
connection_healthy

Returns true if the thread’s stream object has not yet initialized its connection, its connection is healthy, or it is still attempting to establish a connection. False if connecting failed and all allotted retry attempts have been exhausted.

An uninitialized connection:

>>> thread = ListenThread(stream)
>>> thread.connection_healthy
True

A stream that has failed:

>>> thread = ListenThread(failed_stream)
>>> thread.connection_healthy
False
restart()

Restart resets the stream object’s‘ failure flags and creates a new ListenThread object with the dead thread’s stream object. Use this method to retry a connection if the connection_healthy property starts to return False.

>>> thread = ListenThread(failed_stream)
>>> thread.restart() 
<ListenThread(..., ...)>
run()

This invokes the stream object’s listen method in the current thread. Use the object’s start method to start a new thread.

>>> thread = ListenThread(stream)
>>> thread.run() 

Table Of Contents

Previous topic

Sitebucket: Twitter Site Streams Consumer

Next topic

Monitoring Streams and Automatically Reconnecting

This Page