Wednesday, October 26, 2016

Drizzle Brings Low-Latency Streaming to Spark; but RISE Lab is Just a Change in Funding


This morning at Spark Summit Europe 2016, Ion Stoica announced during his keynote the Drizzle project, which promises to reduce streaming data latency in Spark to be less than Flink and Storm. Ion announced this in the context of the new RISE Lab at UC Berkeley.

Drizzle is an exciting and important new technology. RISE Lab is simply a change in funding at Berkeley. In fact, Drizzle was announced at Spark Summit (West) this past summer in the context of amplab, not RISE Lab.

Stoica also repeated the common wisdom that Spark came out of amplab, but in fact Matei's first paper on Spark and RDDs came out in 2010 under RAD Lab, the funding model that preceded amplab.

These changes, from RAD Lab to amplab to RISE lab are just changes in funding. The important things -- the people and the projects -- stay throughout. And Drizzle is an important project. By making the streaming tasks long-lived on Spark workers -- as opposed to launching all-new fresh Spark jobs for every micro-batch as in today's Spark Streaming -- latency and resiliency are vastly improved. They are reported to be better than Flink, but keep in mind that the comparison there is between a research project vs. something that is available today to put into production. Flink might improve further by the time Drizzle is released (I don't think the code is even available to download yet to try out).

To watch Ion's keynote, go to about 1:15:00 at http://livestream.com/fourstream/sparksummiteu16-tracka/videos/140168779

For more meaty details on Drizzle, see the Spark Summit (West) 2016 presentation Low Latency Execution for Apache Spark.

Saturday, August 27, 2016

Installation Quickstart: TensorFlow, Anaconda, Jupyter


What better way to start getting into TensorFlow than with a notebook technology like Jupyter, the successor to IPython Notebook? There are two little hurdles to achieve this:
  1. Choice of OS. Trying to use Windows with TensorFlow is as painful as trying to use Windows with Spark. But even within Linux, it turns out you need a recent version. CentOS 7 works a lot better than CentOS 6 because it has a more recent glibc.
  2. A step is missing from the TensorFlow installation page. From StackOverflow:
    conda install notebook ipykernel
Here then are the complete set of steps to achieve Hello World in Tensorflow on Jupyter via Anaconda:
  1. Use CentOS 7.2 (aka 1511), for example using VirtualBox if under Windows. This step may be unnecessary if you use OSX, but I just haven't tried it.
  2. Download and install Anaconda for Python 3.5.
  3. From the TensorFlow installation instructions:
    conda create -n tensorflow python=3.5
    source activate tensorflow
    conda install -c conda-forge tensorflow
  4. From StackOverflow:
    conda install notebook ipykernel
  5. Launch Jupyter:
    jupyter notebook
  6. Create a notebook and type in Hello World:
    import tensorflow as tf
    hello = tf.constant('Hello, TensorFlow!')
    sess = tf.Session()
    print(sess.run(hello))

Saturday, July 30, 2016

900TB, 4U, $60k


The impasse of Peak Hard Drive that I identified two years ago -- that era when 4TB hard drives had been the tops for three years straight -- has been breached, thanks to advances in both hard drives (HDDs) and Solid State Disks (SSDs). 10TB 3.5" HDDs can be ordered from Amazon now for less than $600, and the density of SSDs is skyrocketing as manufacturers promised two years ago. At $10,000, the 2.5" 15TB SSD is indeed dense but at $670/TB, is twice as expensive per terabyte as enterprise 1TB SSDs.
Going forward, Toshiba is predicting 128TB SSDs by 2018 and 40TB HDDs by 2020. As shown in the chart above, SSDs would then firmly breach the long-term exponential growth line in density, while HDDs would merely continue their anemic density growth.

Rack chassis are getting more dense as well. SuperMicro now has a 4U chassis that can hold 90 3.5" HDDs.

Filled with the 10TB HDDs, that would be about $60k for 900TB storage in 4U of rack space. There is no similar solution I've found for 2.5" drives, so using the same chassis for the 15TB SSDs would be $900k for 1.4PB.
But of course that ignores the compute side of the equation. Such ultra-density in a rack would only be suitable for deep-freeze storage. For a Hadoop/Spark application to leverage data locality, a 1U chassis that accommodates 8 2.5" drives and dual processors from SuperMicro would be more appropriate. Populated with the 15TB SSDs, that would be 120TB/1U, or 960TB/8U. If each CPU socket were populated with 22-core Xeons, that would be a total of 352 cores in that 8U. Total cost would be about $750k for that 8U.

Monday, July 18, 2016

Spark Sometimes Forgets to Put Distantly Scoped Variables into Your Closure

This has always been a gotcha with Spark and still is today, yet I don't see a caution of it mentioned much of anywhere.

If your .map() needs access to a variable (or value), and that variable is not defined in the same immediate local scope, "sometimes" Spark will not include it in the closure, leading to erroneous results.

I've never been able to define "sometimes", and I've never been able to come up with a tiny example that demonstrates it. Nevertheless, below is a tiny bit of source code (which does work; that is it does not demonstrate the problem) just to make clear what I'm talking about.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object closure {
  val a = Array(1,2)
  def main(args: Array[String]) {
    // Sometimes the line of code below is necessary (and change the
    // reference to a in the map() to a2 as well)
    // val a2 = a
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("closure"))
    println(sc.makeRDD(Array(3,4)).map(_ + a.sum).collect.mkString(";"))
    sc.stop
  }
}

I've seen it where a "sometimes" gets transmitted to the cluster as a zero-length array.

As background, functional languages like Scala compute closures, which means that when you pass a function as a parameter, it's not just the function that gets passed but all the variables and values that it requires along with it. Scala does compute closures, but not serialize closures for distributed computing. Spark has to compute and serialize its own closures, and sometimes it makes mistakes. Sometimes, it's necessary to give it some help by moving the data you need into the same local scope so that it can pick it up.

Saturday, June 11, 2016

Spark Summit 2016 Review


Summit (West) 2016 took place this past week in San Francisco, with the big news of course being Spark 2.0 which among other things ushers in yet another 10x performance improvement through whole-stage code generation.
This is on top of the 2x performance improvement going from RDDs to 1.4 Dataframes, the 3.5x improvement going from 1.4 Dataframes to 1.5 Dataframes, and the miscellaneous improvements in Spark 1.6 includingautomatic cache vs. execution memory balancing. Overall, this is perhaps a 100x improvement from Spark 0.9 RDDs (July 2014) to Spark 2.0 Dataframes (July 2016).
And that 100x is on top of the improvement over Hadoop's disk-based MapReduce, which itself was another 100x speedup.So combined that's 10,000x speedup from disk-based Hadoop MapReduce to memory-based Spark 2.0 Dataframes.

Overhyped?

During a panel at Spark Summit, a question was put to panelist Thomas Dinsmore as to whether Spark has been overhyped. The day before, I had actually met up with Thomas in the Speaker's lounge and he wanted to get the input from others as to the answer to this question. My response was: Spark might have been overhyped during the 1.x days, but with Spark 2.0 it's caught up to the hype generated during the 1.0 days.
The mantra with Spark has always been: it's in-memory so it's fast -- with an unstated implication that it'snot possible to go any faster than that. Well, as we've seen, it was possible to go faster -- 100x as fast. Spark 2.0 delivers on the Spark 1.0 buzz. Now, Spark 1.x was certianly useful, and it's not like there was anything faster at the time for clusters of commodity hardware, but Spark 1.x carried with it a buzz that didn't get fully realized until Spark 2.0.
In another sign of maturity, Spark 2.0 was not rushed out the door prior to the Summit. This is in contrast to Spark 1.0, which two years ago I criticized as not being stable enough for a 1.0 moniker. (I think Spark 1.2 was really the 1.0 of Spark.)

Graphs

The next-most obvious thing at the Summit was the proliferation of graphs, including a keynote by Capital One(although they used an external graph database rather than GraphX or GraphFrames) and of course my own. But besides these, Ankur Dave gave two talks, one on Tegra and one on GraphFrames. Tegra is interesting because it introduces to Spark for the first time graphs that can grow. It's research work that was done built upon GraphX, but hopefully that technology will get added to GraphFrames.
Besides these four talks, there was yet another presentation, this one from Salesforce, on threat detection.

Crowds

The third-most obvious thing to note about Spark Summit 2016 were the crowds. There were 2500 attendees this year compared to last year's 1500. I've attended some crowded events in my lifetime -- including the 1985 Beach Boys concert on the National Mall (lawn) attended by 800,000 and the 1993 papal Mass in Denver's Cherry Creek State Park attended by 500,000 -- but neither was as crowded as Spark Summit 2016. I'm just glad I was able to hide out in the Speaker lounge during mealtimes. And I feel sorry for the vendors, as the vendor expo this time had to be shunted off to a separate tower of the hotel conference center.
It seems they're going to need to Moscone this next time.

Research

There were five parallel tracks this time compared to three tracks a year ago. I spent most of my time in the "research" track, which is one of the two new tracks. And there were a lot of interesting talks there:
  • Yggdrasil: Faster Decision Trees Using Column Partitioning in Spark
  • Low-latency Execution for Apache Spark - a modification to Spark whereby the number of communication round trips to the driver is minimized.
  • Re-architecting Spark for Performance Understandability - a rewrite of Spark such that each task consumes only one type of resource (CPU, disk, or network) so that performance and bottlenecks can be visualized as well as be reasoned about by a scheduler. Although Kay Ousterhout's previous scheduling work, Sparrow, never made it into Spark, my hunch is that this eventually will. My hunch is that at the time three years ago, there were so many other ways to improve performance (such as Tungsten which as noted above has given a 100x performance improvement through Spark 2.0) that the cost/benefit at the time for incorporating Sparrow wouldn't have been worth it. But now that other avenues have been maxed out, the time is ripe for scheduler improvements.

Corporate Support

The final message was just all the big name vendors jumping on board: Microsoft, IBM, Vertica. I remember back when even Cloudera refused to support Spark (in early 2013).

Saturday, May 14, 2016

Structured Streaming for Lambda Architecture in Spark But Have To Wait For It



Some have the misconception that Lambda Architecture just means you have separate paths for batch and realtime. They miss a key part of Lambda Architecture: the ability to query a unified view of both batch and realtime.
Structured Streaming, also known as Structured Dataframes, will provide a critical piece: the ability to stream directly into a Dataframe, which can then of course be queried with SQL.
To provide the unified view, it will probably be possible to join such a Streaming Dataframe containing the realtime data with an ORC-backed Dataframe containing the historical data. However, as of today (May 14, 2016), the only two data sources available to populate a Streaming Dataframe are memory and file. Notably absent are streaming sources such as Apache Kafka, and last week Michael Armbrust indicated support for non-file data sources might come after Spark 2.0. And then this week Reynold Xin advised:
stay tuned to this blog for more details on Structured Streaming in Spark 2.0, including details on what is possible in this release and what is on the roadmap for the near future
There are still key adds in Spark 2.0: full SQL support including subqueries, and yet another 10x performanceimprovement due to "Tungsten 2.0" (on top of the 2x-10x improvement Tungsten brought over Spark 1.4, 1.5, and 1.6). Currently, Druid is still the reigning champ when it comes to Lambda in a Box. But Spark will likely take that crown before the end of this year.

Thursday, April 7, 2016

Declarative Machine Learning

SQL is commonly referred to as a 4GL, or fourth-generation programming language, as opposed to all of the 3GL's like Java, C++, Python, Scala, etc. SQL is referred to as a declarative language as opposed to an imperativelanguage like the 3GL's. You tell SQL what to do, not how to do it.
Well, TuPAQ is the SQL for machine learning. You give it a high-level goal, and it figures out which machine learning algorithm to use, and tunes the hyperparameters for you. Example code for speech-to-text translation from Evan Sparks et al:
SELECT vm.sender, vm.arrived,
PREDICT(vm.text, vm.audio)
GIVEN LabeledVoiceMails
FROM VoiceMails vm
WHERE vm.user = 'Bob' AND vm.listened is NULL
ORDER BY vm.arrived
DESC LIMIT 50
When will you be able to use this in production? Hopefully, it's not too far away -- maybe a year, as a wild guess. At Spark Summit in June, 2015, Evan Sparks indicated KeystoneML would "soon" integrate with TuPAQas both KeystoneML and TuPAQ are AMPLab projects.
Although I gave KeystoneML a tepid review when it first came out, the new 0.3 version announced last weekshows the impressive direction they're headed in. Although not quite as declarative as TuPAQ, it is still declarative. An example of declaring a machine learning pipeline in KeystoneML:
val trainData = NewsGroupsDataLoader(sc, trainingDir)

val predictor = Trim andThen
    LowerCase() andThen
    Tokenizer() andThen
    NGramsFeaturizer(1 to conf.nGrams) andThen
    TermFrequency(x => 1) andThen
    (<CommonSparseFeatures(conf.commonFeatures), trainData.data) andThen
    (NaiveBayesEstimator(numClasses), trainData.data, trainData.labels) andThen
    MaxClassifier
Sure, the spark.ml package from Spark MLlib is also pipeline-centric, but whereas spark.ml simply relies on DataFrames/Catalyst/Tungsten to optimize each stage of the pipeline, KeystoneML analyzes and optimizes the pipeline as a whole. It "inspects the pipeline DAG and automatically decides where to cache intermediate output using a greedy algorithm."
Are there other declarative machine learning systems out there? Apache SystemML claims to be declarative, but it is only in that automatically plans deployment to a cluster based on data locality, available memory, etc.
SystemML claims that the high-level languages it provides, DML and PyDML, are "declarative", but they are not. They are still imperative languages. Their purpose is to allow non-Spark developers to write machine learning programs in languages they are comfortable in (like Python), yet be able to compile down to Spark Scala when the time comes to deploy to production. Thus, these are high-level languages as SystemML claims, but they are still imperative and not declarative. The ability of SystemML to plan optimal deployment to a cluster, however, is declarative.

Tuesday, March 29, 2016

Table of XX2Vec Algorithms

XX2VecEmbedInSup/UnsupAlgorithms used
Char2VecCharacterSentenceUnsupervisedCNN -> LSTM
Word2VecWordSentenceUnsupervisedANN
GloVeWordSentenceUnsupervisedSGD
Doc2VecParagraph VectorDocumentSupervisedANN -> Logistic Regression
Image2VecImage ElementsImageUnsupervisedDNN
Video2VecVideo ElementsVideoSupervisedCNN -> MLP
The powerful word2vec algorithm has inspired a host of other algorithms listed in the table above. (For a description of word2vec, see my Spark Summit 2015 presentation.) word2vec is a convenient way to assign vectors to words, and of course vectors are the currency of machine learning. Once you've vectorized your data, you are then free to apply any number of machine learning algorithms.
word2vec is able to come up with vectors by leveraging the concept of embedding. In a corpus, a word appears in the context of surrounding words, and word2vec uses those co-occurrences to infer relationships between those words.
All of the XX2Vec algorithms listed in the table above assign vectors to X's, where those X's are embedded in some larger context Y.
But the similarities end there. Each XX2Vec algorithm not only goes about it through means suited for its domain, but their use cases aren't even analagous. Doc2Vec, for example, is supervised learning whereas most of the others are unsupervised learning. The goal of Doc2Vec is to be able to apply labels to documents, whereas the goal of word2Vec and most of the other XX2Vec algorithms is simply to spit out vectors that you can then go and do other machine learning and analyses on (such as analogy detection).
Here is a brief description of each XX2Vec:

Char2Vec

Like word2vec but because it operates at the character level, it is much more tolerant of misspellings and thus better for analysis of tweets, user product reviews, etc.

Word2Vec

Described above. But one more note: it's one of those unreasonably effective algorithms -- a kind of getting lucky, if you will.

GloVe

Instead of just getting lucky, there have been a number of efforts to ground the idea of word embeddings in something more mathematical than just pulling weights out of a neural network and hoping they work. GloVe is the current standard-bearer in this regard. Its model is designed from the ground up to support finding analogies, instead of just getting them by chance in word2vec.

Doc2Vec

Actually, Doc2Vec uses Word2Vec as a first pass. It then comes up with a composite vector for each sentence or paragraph from the contributing Word2Vec word vectors. This composite gives some kind of overall context to the sentence or paragraph, and then this composite vector is plopped down into the beginning of the sentence or pargraph as an "extra word". The paragraph vectors togeher with the word vectors are used to train a supervised-learning classifier using human labels of the documents.

Image2Vec

Whereas word2vec intentionally uses a shallow neural network, Image2Vec uses a deep neural network and composes the resultant vectors from the weights from multiple layers of the network. Image elements that might be represented by these weights include image fragments (grass, bird, fence, etc.) or overall image qualities like color.

Video2Vec

If machine learning on images involves high dimensions, videos involve even higher dimensions. Video2Vec does some initial dimension reduction by doing a first pass with convolutional neural networks.

Monday, March 21, 2016

DataFrame/DataSet swap places in Spark 2.0



In Spark 1.6, the developers behind Spark created DataSets by copying and pasting the code from DataFrames (and then added genericization and type safety). But in Spark 2.0, the tables are turned. Last week, Reynold Xin resolved SPARK-13880 "Rename DataFrame.scala as DataSet.scala. So what happens to DataFrames in Spark 2.0? Reduced to a single line of code:

type DataFrame = Dataset[Row]

So whereas it could be said in Spark 1.6 that DataSets are a derivation of DataFrames, it is specifically the case in Spark 2.0 that DataFrames are a derivation of DataSets.

Friday, March 11, 2016

Symmetric Difference in GraphX

A question was posed over at the online forums for my book about how to implement symmetry difference in GraphX. The answer is the code below.

import org.apache.spark.graphx._
  
val g = Graph(sc.makeRDD(Array((1L,""),(2L,""),(3L,""))),
              sc.makeRDD(Array(Edge(1L,2L,0),Edge(1L,3L,0))))
  
val ids = g.vertices.map(_._1).cache
  
Graph(g.vertices, ids.cartesian(ids).filter(x => x._1 < x._2)
       .map(x => Edge(x._1,x._2,0)))
  .outerJoinVertices(g.collectNeighborIds(EdgeDirection.Either))(
     (_,_,u) => u.get.toSet)
  .mapTriplets(et => ((et.srcAttr | et.dstAttr) &~
                      (et.srcAttr & et.dstAttr)).size)
  .triplets
  .map(et => (et.srcId, et.dstId, et.attr))
  .collect

This short piece of code pulls a number of tricks. First is the overall strategy. The goal is to identify the symmetric difference size for every possible pair of vertices in the graph. This suggests that we need to do a Cartesian product to obtain all possible pairs of vertices. But rather than just getting the Cartesian product and doing an RDD map() directly off that, we instead create a whole new Graph where the edges are that Cartesian product. The reason is so that we can leverage outerJoinVertices() and glom on the set of nearest neighbors using collectNeighborIds()(which returns a VertexRDD, suitable for outerJoinVertices()).

And collectNeighborIds() itself is a powerful function that didn't get covered in my book. It's a convenient way to, for each vertex, gather the vertex Ids of all the neighbor vertices.

Finally, to compute the symmetry difference we use Scala Set operations, as the symmetry difference is defined as:

A Δ B = (A ∪ B) - (A ∩ B)

Note in Scala the set difference operator is &~ rather than -.

Saturday, March 5, 2016

Beyond GraphX in graphs for Spark

This week Databricks announced GraphFrames, a library posted to spark-packages.org that is based on Spark SQL Dataframes rather than RDDs (as GraphX is). GraphFrames is still a work in progress -- it is currently at the 0.1 version -- so it provides interoperability with GraphX (graphs can be converted back and forth).
GraphFrames provides the graph querying capability that GraphX always had trouble with. GraphFrames, because it uses DataFrames from Spark SQL, allows you to query graphs using SQL. Plus GraphFrames sports a subset of Cypher, the query language from Neo4j.
I describe GraphFrames and provide some interesting examples in chapter 10 of my book. Chapter 10 was just released to the MEAP (Manning Early Access Program) for my book this week.
GraphFrames is also performant due to the two optimization layers built-in to Spark SQL: Catalyst and Tungsten. Catalyst is an RDBMS-style query plan optimizer, and Tungsten leverages the sun.misc.unsafe API to do direct OS memory access, bypassing the JVM (as well as avoiding garbage collection). Tungsten also performs code generation, generating JVM bytecode on the fly to access Tungsten-laid-out memory structures in a maximally efficient manner. One of the examples in my book shows an 8x speedup compared to the GraphX version.
And, in a hat tip to Andy Petrella, author of Spark Notebook, GraphFrames is not the only new library published on spark-projects.org. There are also:
  • Spark Centrality - Library for computing centrality for graph nodes
  • spark-beetweenness - k Betweenness Centrality algorithm for Spark using GraphX
  • sparkling-graph - Large scale, distributed graph processing made easy! Load your graph from multiple formats and compute measures (but not only)