Use buffer.remaining() instead of buffer.array().
Kafka's message.payload() returns a ByteBuffer, and if you need access to the raw bytes, it's tempting to just call message.payload().array(), but that would be the wrong thing to do. That returns the backing array to the ByteBuffer, which may (and in the case of Kafka usually is) larger than the data it represents, with offsets and limits maintained by ByteBuffer.
Sadly, there is no single-line solution. Below is the most concise way to get the raw bytes out of a Kafka message and into a plain vanilla byte[] array.
ByteBuffer buffer = message.payload(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes);
-
If sending Avro over Kafka, then serialize using EncoderFactory instead of DataFileWriter with ByteArrayOutputStream, for two reasons:
- DataFileWriter writes out an entire Avro file, complete with schema. Sending the schema with every Kafka message is a waste.
- If you plan to consume Kafka messages with Camus, then you need to use EncoderFactory because Camus uses DecoderFactory.
encoder.flush()
If sending Avro over Kafka and using EncoderFactory, then don't forget to call encoder.flush() after writing, as in this Avro Hello World example.
Thursday, March 21, 2013
Kafka tips
Three tips on using Kafka:
Saturday, March 9, 2013
Automatically deskew before machine learning in R
A common data pre-processing step in R is to deskew data, which is where if a histogram shows a lopsided distribution, apply a function such as log() before fitting a model. If there are a large number of columns, it can be tedious to eyeball each histogram, and manually substitute offending columns with their log() counterparts.
Helpfully, the e1071 package (notably for its support vector machine algorithms) provides a handy function to measure the skewness of data, called skewness(). Below is a function to automatically deskew an entire range of columns of a data frame.
Helpfully, the e1071 package (notably for its support vector machine algorithms) provides a handy function to measure the skewness of data, called skewness(). Below is a function to automatically deskew an entire range of columns of a data frame.
deskew <- function(df, mincol=1, maxcol=ncol(df), threshold=1.10) { for (i in mincol:maxcol) { t <- log(1+df[[i]]-min(df[[i]])) if (abs(skewness(df[[i]])) > threshold * abs(skewness(t))) df[[i]] <- t } df }Deskewing data improves the performance of linear models, both regular lm()/glm() and linear svm() support vector machines. Understandably, it doesn't help with decision trees such as randomTrees().
Monday, March 4, 2013
Fixing Hive after Cloudera 4.20 update
The Cloudera 4.20 update breaks Hive, producing errors such as
To manually upgrade the Hive metastore from the 0.9 schema to 0.10 (assuming you are using MySQL for the Hive metastore as recommended by Cloudera):
*org.datanucleus.store.rdbms.exceptions.MissingTableException: Required table missing : "`SKEWED_STRING_LIST`" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.autoCreateTables")It's because the deployment script doesn't call the Hive metastore upgrade script.
To manually upgrade the Hive metastore from the 0.9 schema to 0.10 (assuming you are using MySQL for the Hive metastore as recommended by Cloudera):
cd /usr/lib/hive/scripts/metastore/upgrade/mysql sudo mysql --user=root use metastore; source upgrade-0.9.0-to-0.10.0.mysql.sql
Friday, March 1, 2013
Strata 2013 Santa Clara trip report
Strata, the O'Reilly conference on Big Data, just wrapped up Feb 26-28, 2013 in Santa Clara. The story of 2013 is we're entering the post-Hadoop -- read all the juicy details about Big Data, Data Science, and Visualization in my daily trip reports:
Strata Trip Report: Days 0 & 1
Strata Trip Report: Day 2
Strata Trip Report: Day 3
Strata Trip Report: Days 0 & 1
Strata Trip Report: Day 2
Strata Trip Report: Day 3
Subscribe to:
Posts (Atom)