Monitoring Streams and Automatically Reconnecting

class sitebucket.monitor.ListenThreadMonitor(follow, consumer, token, stream_with='user', parser=<sitebucket.parser.DefaultParser object at 0x1021bf6d0>, *args, **kwargs)

The ListenThreadMonitor takes a follow list of any size, creates ListenThread objects and corresponding SiteStream objects for the follow list and initializes them. By default, the monitor will attempt to restart any connections that fail.

Keyword arguments:

  • follow – a list of users that have authenticated your app to follow
  • stream_with – ‘user’ or ‘followings’. A value of ‘user’ will cause the stream to only return data about actions the users specified in follow take. A value of ‘followings’ will cause the Stream object to return data about the user’s followings (basically their home timeline). This defaults to ‘user’.
  • consumer – a python-oauth2 Consumer object for the app
  • token – a python-oauth2 Token object for the app’s owner account.
  • parser – an object that extends BaseParser that will handle data returned by the stream.

The monitor’s run method blocks, so invoke it via start method if you want to run it in a separate thread.

To use, first import ListenThreadMonitor and oauth2:

>>> from sitebucket import ListenThreadMonitor
>>> import oauth2

Then generate your Consumer and Token objects:

>>> token = oauth2.Token('key', 'secret')
>>> consumer = oauth2.Consumer('key', 'secret')

And then instantiate your ListenThreadMonitor object:

>>> monitor = ListenThreadMonitor([1,2,3], consumer, token)

Calling run will start the streaming connections and block until you kill the process:

>>> monitor.run() 

Calling start will start the monitor loop in a separate thread:

>>> monitor.start() 

It can be killed later via the disconnect method:

>>> monitor.disconnect()
add_follows(follow, start=True)

Creates and adds new ListenThreads based on a specified follow list. Optionally starts the new threads.

  • follow – list of users to start following
  • start – (default: True) If true, start running the new threads.
>>> follow = range(1,FOLLOW_LIMIT*10+1)
>>> monitor = ListenThreadMonitor([], consumer, token)
>>> monitor.add_follows(follow, start=False)
>>> len(monitor.threads) >= len(follow)/FOLLOW_LIMIT
True
consolidate_streams()

Find all streams that aren’t following the maximum number of users they are permitted to follow and consolidate them into the smallest number of streaming connections possible.

>>> monitor = ListenThreadMonitor([], consumer, token)
>>> monitor.add_follows([1,], start=False)
>>> monitor.add_follows([2,], start=False)
>>> monitor.add_follows([3,], start=False)
>>> len(monitor.threads)
3
>>> monitor.consolidate_streams()
>>> len(monitor.threads)
1
disconnect()

Sets the disconnect flag to True, which will cause the monitor’s loop to terminate.

>>> monitor = ListenThreadMonitor([], consumer, token)
>>> monitor.disconnect()
>>> monitor.run()
healthy_streams

Returns a list of all threads that are healthy, uninitialized, or connecting.

>>> monitor = ListenThreadMonitor([1], consumer, token)
>>> len(monitor.healthy_streams) == 1
True
nonfull_streams

Returns a list of threads that aren’t following a number of users equal to FOLLOW_LIMIT.

>>> follow = range(1,FOLLOW_LIMIT+2)
>>> monitor = ListenThreadMonitor(follow, consumer, token)
>>> len(monitor.nonfull_streams) == 1
True
restart_unhealthy_streams()

Restart all unhealthy streaming ListenThreads.

run()

Starts all threads and begins monitoring loop. Invoke this via the object’s start method to run the monitor in a separate thread.

>>> monitor = ListenThreadMonitor([1, 2, 3], consumer, token)
>>> monitor.run() 
unhealthy_streams

Returns a list of all threads with failed connections.

>>> monitor = ListenThreadMonitor([], consumer, token)
>>> monitor.unhealthy_streams
[]
>>> failed_thread = ListenThread(failed_stream)
>>> monitor.threads.append(failed_thread)
>>> monitor.unhealthy_streams[0] == failed_thread
True

Previous topic

Streaming Connections

Next topic

Parsing Stream Output

This Page