Sunday, August 12, 2012

Kugelbot - or what to do with a Raspberry Pi

With the Raspberry Pi board now up and running on the network, I needed something "reasonable" for it to do. Maybe an homage to the famous Trojan room coffe pot camera - 20 years later, at a fraction of the cost? Hosting a download mirror for Raspberry Pi boot images on a Raspberry Pi? A probe for network performance monitoring? A twitter robot which recites The Iliad 140 characters at a time?

Finally, I settled for a robot which reposted a summary and link to all my public Google+ postings to my otherwise unused Twitter account.

In addition to Python 2.7 already included in the boot image, the following ingredients were used:

In order to read public posts via the Google+ API, no authentication is required, but a developer key is needed for quota tracking, which can be requested/registered here for any valid Google account. In order to access the Twitter API, a new app first needs to be registered here, after which a set of static OAuth credentials can be generated for the owner of the app, which is good enough here, as this robot only needs to be able to access my own account. It also uses the Google URL shortener API to shorten the long-ish Google+ post URLs into something more appropriate in for the spartan Twitter interface (same client library and developer API key).

The following script is largely stitched together from the samples provided with the tweepy and google api client packages. It uses a Sqlite3 database to store the association between Google+ posts and tweets, acts as a queue of pending tweets and as a way to detect new posts on Google+ through polling.  The state of the system can be inspected anytime using the sqlite3 command-line interface (install by apt-get sqlite3). It can run as a daemon and roughly every 40min, checks for new Goog+ posts and sends at most one tweet only from the queue. Creating a 140 character tweet from the content of each post is done in a less than elegant way, typically by truncating into an elipsis on a series of what might be considered phrase terminating characters (punctuation or even white spaces). Generating more "engaging" and relevant snippets from a post might be an interesting exercise in natural language processing, but a bit beyond the scope of a weekend project.

Known to Twitter as "Kugelbot", this script running on the Raspberry Pi has been tweeting its way slowly through a backlog of 180 messages. In the process acquiring more follows in a day than I had before and getting the Twitter->Facebook auto-posting agent black-listed by exceeding 50 posts in a day.

And once it gets to this post, it will reach a meta-moment: a robot posting its own source-code...


#!/usr/bin/python
# -*- coding: utf-8 -*-

import apiclient.discovery
import daemon
import gflags
import HTMLParser
import logging
import logging.handlers
import os
import random
import sqlite3
import sys
import time
import tweepy

FLAGS = gflags.FLAGS

# The gflags module makes defining command-line options easy for
# applications. Run this program with the '--help' argument to see
# all the flags that it understands.
gflags.DEFINE_enum('logging_level', 'INFO',
    ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
    'Set the level of logging detail.')

gflags.DEFINE_string('api_key', 'xxx',
                    'Google API key')
gflags.DEFINE_string('user_id', 'xxx',
                     'Google+ user/profile ID')

gflags.DEFINE_string('db', 'posts.db',
                     'database of posts to tweet mappings')

gflags.DEFINE_string('pidfile', '',
                    'pidfile if process should run as daemon')

gflags.DEFINE_integer('sleep_time', 1200,
                      'min time between tweets')

class PostsDb(object):
  """
  SQLite database containing the G+ to tweet mapping state.
  """
  def __init__(self, dbname):
    self._conn = sqlite3.connect(dbname)
    c = self._conn.cursor()
    c.execute('create table if not exists posts (post_id text, post_date text, tweet_id text, tweet_date text, content text)')
    self._conn.commit()
    c.close()

  def insert(self, post_id, date, text):
    """
    Insert a new post to be sent to twitter.
    Return True if the post is new, False otherwise.
    """
    c = self._conn.cursor()
    if c.execute('SELECT post_id from posts where post_id=?', (post_id, )).fetchone():
      c.close()
      return False
    c.execute('INSERT INTO posts VALUES (?,?,?,?,?)', (post_id, date, '', '', text))
    self._conn.commit()
    c.close()
    return True

  def next(self):
    """
    Return the tuple of (post_id, text) for the oldest post which has not yet been tweeted.
    """
    c = self._conn.cursor()
    post = c.execute('''SELECT post_id, content FROM posts WHERE tweet_id = '' ORDER BY post_date LIMIT 1''').fetchone()
    c.close()
    return post

  def tweet(self, post, tweet_id, date):
    """
    Record a tweet in the database.
    """
    c = self._conn.cursor()
    c.execute('UPDATE posts SET tweet_id=?, tweet_date=? WHERE post_id=?', (tweet_id, date, post))
    self._conn.commit()
    c.close()


class MLStripper(HTMLParser.HTMLParser):
  """
  Trivial HTML parser, which returns only the text without any markup.
  """
  def __init__(self):
    self.reset()
    self.fed = []
  def handle_data(self, d):
    self.fed.append(d)
  def get_data(self):
    return ''.join(self.fed)

def strip_html(s):
  """
  Remove any HTML markup and coding/escaping.
  """
  if s:
    stripper = MLStripper()
    stripper.feed(s)
    s = stripper.get_data()
  if not s:
    return 'untitled'
  else:
    return s

def make_tweet(url, text):
  """
  Format a tween with text, URL and static #gplus hash-tag. Shorten text to elipsis, if nece..
  """
  tail = ' ' + url + ' #gplus'
  text_size = 140 - len(tail)
  text = strip_html(text)
  if len(text) > text_size:
    text = text[:text_size - 2]
    # shorten string to end in one of N characters and keep the shortest
    shortest = text
    for c in ('! ', '. ', '; ', ' - ', ' '):
      candidate = text.rsplit(c, 1)[0]
      if len(candidate) < len(shortest):
        shortest = candidate
    text = shortest + '..'
  return text + tail


def load_posts(db):
  """
  Traverse G+ stream for new public posts not yet in the database and shorten into tweets
  """
  gplus_service = apiclient.discovery.build("plus", "v1", developerKey=FLAGS.api_key)
  url_service = apiclient.discovery.build('urlshortener', 'v1', developerKey=FLAGS.api_key)

  # Public posts of a given G+ user (ID is number in profile URL)
  request = gplus_service.activities().list(
        userId=FLAGS.user_id, collection='public')

  while (request != None):
    activities_doc = request.execute()
    for item in activities_doc.get('items', []):
      shorturl = url_service.url().insert(body={'longUrl': item['url']}).execute()['id']
      content = item['object']['content']
      if item['title'].startswith('Reshared'):
        content = 'Reshared: ' + content
      tweet = make_tweet(shorturl, content)

      # insert new post and exist if it already exists
      if not db.insert(item['id'], item['published'], tweet):
        return
      logging.info('inserted %s: "%s"', item['published'], tweet)
    request = gplus_service.activities().list_next(request, activities_doc)


def tweet(db):
  """
  Send a single untweeted entry from the database to twitter account. 
  """
  # The consumer keys can be found on your application's Details
  # page located at https://dev.twitter.com/apps (under "OAuth settings")
  consumer_key='xxx'
  consumer_secret='xxx'
  
  # The access tokens can be found on your applications's Details
  # page located at https://dev.twitter.com/apps (located 
  # under "Your access token")
  access_token='xxx'
  access_token_secret='xxx'
  
  # If there is no untweeted post, skip and do nothing
  post = db.next()
  if not post:
    return

  # API authentication with static OAuth access token
  auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
  auth.set_access_token(access_token, access_token_secret)
  api = tweepy.API(auth)

  tweet = api.update_status(post[1])

  logging.info('tweeted "%s"', tweet.text)
  db.tweet(post[0], tweet.id, tweet.created_at)

  
def main(argv):
  # Let the gflags module process the command-line arguments
  try:
    argv = FLAGS(argv)
  except gflags.FlagsError, e:
    print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS)
    sys.exit(1)
    
  # Set the logging according to the command-line flag and send logs to syslog
  logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level))
  syslog = logging.handlers.SysLogHandler(address='/dev/log')
  syslog.setFormatter(logging.Formatter('kugelbot: %(levelname)s %(message)s'))
  logging.getLogger().addHandler(syslog)

  db = PostsDb(FLAGS.db)

  if FLAGS.pidfile:
    daemon.daemonize(FLAGS.pidfile)
    logging.info('daemonized with pidfile %s', FLAGS.pidfile)

  # Main loop - repeat forever
  while True:
    try:
      time.sleep(random.randint(FLAGS.sleep_time, FLAGS.sleep_time * 3))
      load_posts(db) 
      tweet(db) # One tweet only, please...
    except (KeyboardInterrupt, SystemExit):
      return
    except:
      logging.exception('error in main loop')

if __name__ == '__main__':
  main(sys.argv)