Thursday, March 21, 2013

Kafka tips

Three tips on using Kafka:
  1. Use buffer.remaining() instead of buffer.array().

    Kafka's message.payload() returns a ByteBuffer, and if you need access to the raw bytes, it's tempting to just call message.payload().array(), but that would be the wrong thing to do. That returns the backing array to the ByteBuffer, which may (and in the case of Kafka usually is) larger than the data it represents, with offsets and limits maintained by ByteBuffer.

    Sadly, there is no single-line solution. Below is the most concise way to get the raw bytes out of a Kafka message and into a plain vanilla byte[] array.

    ByteBuffer buffer = message.payload();
    byte[] bytes = new byte[buffer.remaining()];
  2. If sending Avro over Kafka, then serialize using EncoderFactory instead of DataFileWriter with ByteArrayOutputStream, for two reasons:

    • DataFileWriter writes out an entire Avro file, complete with schema. Sending the schema with every Kafka message is a waste.
    • If you plan to consume Kafka messages with Camus, then you need to use EncoderFactory because Camus uses DecoderFactory.
  3. encoder.flush()

    If sending Avro over Kafka and using EncoderFactory, then don't forget to call encoder.flush() after writing, as in this Avro Hello World example.

Saturday, March 9, 2013

Automatically deskew before machine learning in R

A common data pre-processing step in R is to deskew data, which is where if a histogram shows a lopsided distribution, apply a function such as log() before fitting a model. If there are a large number of columns, it can be tedious to eyeball each histogram, and manually substitute offending columns with their log() counterparts.

Helpfully, the e1071 package (notably for its support vector machine algorithms) provides a handy function to measure the skewness of data, called skewness(). Below is a function to automatically deskew an entire range of columns of a data frame.
deskew <- function(df, mincol=1, maxcol=ncol(df), threshold=1.10) {
  for (i in mincol:maxcol) {
    t <- log(1+df[[i]]-min(df[[i]]))
    if (abs(skewness(df[[i]])) > threshold * abs(skewness(t)))
      df[[i]] <- t
Deskewing data improves the performance of linear models, both regular lm()/glm() and linear svm() support vector machines. Understandably, it doesn't help with decision trees such as randomTrees().

Monday, March 4, 2013

Fixing Hive after Cloudera 4.20 update

The Cloudera 4.20 update breaks Hive, producing errors such as
* Required
table missing : "`SKEWED_STRING_LIST`" in Catalog "" Schema "". DataNucleus
requires this table to perform its persistence operations. Either your
MetaData is incorrect, or you need to enable "datanucleus.autoCreateTables")
It's because the deployment script doesn't call the Hive metastore upgrade script.

To manually upgrade the Hive metastore from the 0.9 schema to 0.10 (assuming you are using MySQL for the Hive metastore as recommended by Cloudera):
cd /usr/lib/hive/scripts/metastore/upgrade/mysql
sudo mysql --user=root
use metastore;
source upgrade-0.9.0-to-0.10.0.mysql.sql

Friday, March 1, 2013

Strata 2013 Santa Clara trip report

Strata, the O'Reilly conference on Big Data, just wrapped up Feb 26-28, 2013 in Santa Clara. The story of 2013 is we're entering the post-Hadoop -- read all the juicy details about Big Data, Data Science, and Visualization in my daily trip reports:

Strata Trip Report: Days 0 & 1
Strata Trip Report: Day 2
Strata Trip Report: Day 3