Use buffer.remaining() instead of buffer.array().
Kafka's message.payload() returns a ByteBuffer, and if you need access to the raw bytes, it's tempting to just call message.payload().array(), but that would be the wrong thing to do. That returns the backing array to the ByteBuffer, which may (and in the case of Kafka usually is) larger than the data it represents, with offsets and limits maintained by ByteBuffer.
Sadly, there is no single-line solution. Below is the most concise way to get the raw bytes out of a Kafka message and into a plain vanilla byte[] array.
ByteBuffer buffer = message.payload(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes);
-
If sending Avro over Kafka, then serialize using EncoderFactory instead of DataFileWriter with ByteArrayOutputStream, for two reasons:
- DataFileWriter writes out an entire Avro file, complete with schema. Sending the schema with every Kafka message is a waste.
- If you plan to consume Kafka messages with Camus, then you need to use EncoderFactory because Camus uses DecoderFactory.
encoder.flush()
If sending Avro over Kafka and using EncoderFactory, then don't forget to call encoder.flush() after writing, as in this Avro Hello World example.
Thursday, March 21, 2013
Kafka tips
Three tips on using Kafka:
Subscribe to:
Post Comments (Atom)
1 comment:
Hi!
First of all, thanks for your posts.
I'm wondering about the implications of using message.payload.array(), as this is what I've been doing for a while and (as far as I can tell) without problem.
The way I'm doing it is that I'm instantiating a String which contains the relevant (payload) part of the Kafka message in the following way:
new String(message.payload.array(), Message.payloadOffset(message.magic), message.payloadSize)
i.e.: this constructor: http://docs.oracle.com/javase/6/docs/api/java/lang/String.html#String(byte[], int, int)
If my understanding is correct, this uses the underlying Array[Byte] but starts and ends at proper indexes using payloadOffset() and payloadSize.
Do you envision any pitfalls with this approach? Am I missing anything?
N.B.: The above code is scala, hence why some of the missing double parenthesis ().
Post a Comment