Tag Archives: hadoop

Big Data and OfficeWriter

Big Data DemosWe partnered with Andrew Brust from Blue Badge Insights to integrate OfficeWriter with Hadoop and Big Data. Taking existing OfficeWriter sample projects, Andrew discusses how he created two demos showing OfficeWriter’s capabilities to work with Big Data. One demo uses C#-based MapReduce code to perform text-mining of Word docs. The other demo focuses on connecting to Hadoop through Hive.

In these demos you will learn:

  • How OfficeWriter integrates with Hadoop and Big Data
  • How to use ExcelWriter with Hadoop

How to Set Up Apache Mahout

Apache Mahout is a set of machine learning tools, which deal with classification, clustering, recommendations, and other related stuff. We just bought a new book called Mahout In Action which is full of good examples and general machine learning advice; you can find it here. It’s pretty neat and it’s growing quickly, so I decided to take the time to learn about it.

Mahout functions as a set of MapReduce jobs. It integrates cleanly with Hadoop, and this makes it very attractive for doing text analysis on a large scale. Simpler queries, for instance getting the average response time from a customer, are probably better suited for Hive.

Most examples I’ve seen use Mahout as sort of a black box. The command line just forwards arguments to various Driver classes, which then work their magic. All input and output seems to be through HDFS, and Mahout also uses intermediate temp directories inside HDFS. I tried changing one of the Driver classes to work with HBase data, but the amount of work that seemed to be necessary was non-trivial.


I decided to work with Enron email data set because it’s reasonably large and it tells a story about fraud and corruption. Their use of keywords like ‘Raptor’ and ‘Death Star’ in place of other more descriptive phrases makes topic analysis pretty interesting.

Please read ‘Important things to watch out for’ at the bottom of this post first if you want to follow along.

This is what I did to get the Enron mail set to be analyzed using the LDA algorithm (Latent Dirchlet Allocation), which looks for common topics in a corpus of text data:

  • The Enron emails are stored in the maildir format, a directory tree of text emails. In order to process the text, it first needs to be converted to SequenceFiles. A SequenceFile is a file format used extensively by Hadoop, and it contains a series of key/value pairs. One way to convert a directory of text to SequenceFiles is to use Mahout’s seqdirectory command:
    ./bin/mahout seqdirectory -i file:///home/georges/enron_mail_20110402 -o /data/enron_seq

    This can take a little while for large amounts of text, maybe 15 minutes. The SequenceFiles produced have key/value pairs where the key is the path of the file and the value is the text from that file.

  • Later on I wrote my own Java code which parsed out the mail headers to prevent them from interfering with the results. It is fairly simple to write a MapReduce task to quickly produce your own SequenceFiles. Also note that there are many other possible sources of text data, for instance Lucene indexes. There’s a list of ways to input text data here.
  • I needed to tokenize the SequenceFiles into vectors. Vectors in text analysis are a technical idea that I won’t get into, but these particular vectors are just simple term frequencies.
    ./bin/mahout seq2sparse -i /data/enron_seq -o /data/enron_vec_tf --norm 2 -wt tf -seq

    This command may need changing depending on what text analysis algorithm you’re using. Most algorithms would require tf-idf instead, which weights the term frequency against the size of the email. This took 5 minutes on a 10-node AWS Hadoop cluster. (I set the cluster up using StarCluster, another neat tool for managing EC2 instances.)

  • I ran the LDA algorithm:
    ./bin/mahout lda -i /dev/enron_vec_tf/tf-vectors -o /data/enron_lda -x 20 -k 10

    x is the max number of iterations for the algorithm. k is the number of topics to display from the corpus. This took a little under 2 hours on my cluster.

  • List the LDA topics:
    ./bin/mahout ldatopics -i /data/enron_lda/state-4 --dict /data/enron_vec_tf/dictionary.file-0 -w 5 --dictionaryType sequencefile

    This command is a bit of pain because it doesn’t really error when you have an incorrect parameter, it just does nothing. Here’s some of the output I got:

    MAHOUT_LOCAL is not set; adding HADOOP_CONF_DIR to classpath.
    Running on hadoop, using HADOOP_HOME=/usr/lib/hadoop-0.20
    MAHOUT-JOB: /data/mahout-distribution-0.5/examples/target/mahout-examples-0.6-SNAPSHOT-job.jar
    Topic 0
    i [p(i|topic_0) = 0.023824791149925677
    information [p(information|topic_0) = 0.004141992353710214
    i'm [p(i'm|topic_0) = 0.0012614859683494856
    i'll [p(i'll|topic_0) = 7.433430267661564E-4
    i've [p(i've|topic_0) = 4.22765928967555E-4
    Topic 1
    you [p(you|topic_1) = 0.013807669181244436
    you're [p(you're|topic_1) = 3.431068629183266E-4
    you'll [p(you'll|topic_1) = 1.0412948245383297E-4
    you'd [p(you'd|topic_1) = 8.39664771688153E-5
    you'all [p(you'all|topic_1) = 1.5437174634592594E-6
    Topic 2
    you [p(you|topic_2) = 0.03938587430317399
    we [p(we|topic_2) = 0.010675333661142919
    your [p(your|topic_2) = 0.0038312042763726448
    meeting [p(meeting|topic_2) = 0.002407369369715602
    message [p(message|topic_2) = 0.0018055376982080878
    Topic 3
    you [p(you|topic_3) = 0.036593494258252174
    your [p(your|topic_3) = 0.003970284840960353
    i'm [p(i'm|topic_3) = 0.0013595988902916712
    i'll [p(i'll|topic_3) = 5.879175074800994E-4
    i've [p(i've|topic_3) = 3.9887853536102604E-4
    Topic 4
    i [p(i|topic_4) = 0.027838628233581693
    john [p(john|topic_4) = 0.002320786569676983
    jones [p(jones|topic_4) = 6.79365597839018E-4
    jpg [p(jpg|topic_4) = 1.5296038761774956E-4
    johnson [p(johnson|topic_4) = 9.771211326361852E-5
  • Looks like the data needs a lot of munging to provide more useful results. Still, you can see the relationship between some of the words in each topic.

I recommend playing around with the examples in the examples/bin directory in the Mahout folder.

Important things to watch out for

  • I ran out of heap space once I asked Mahout to do some real work. I needed to increase the heap size for child MapReduce processes. How to do this is basically described here. You only need the -Xmx option, and I went for 2 gigabytes:

    You may also want to set MAHOUT_HEAPSIZE to 2048, but I’m not sure how much this matters.

  • Some environment variables weren’t set on my StarCluster instance by default, and the warnings are subtle. HADOOP_HOME is particularly important. If HADOOP_HOME is not set, MapReduce jobs will run as local jobs. There were weird exceptions accessing HDFS, and your jobs won’t show up in the job tracker. They do warn you in the console output for the job, but it’s easy to miss. JAVA_HOME is also important but it will explicitly error and tell you to set this. HADOOP_CONF_DIR should be set to $HADOOP_HOME/conf. For some reason it assumes you want HADOOP_HOME/src/conf instead if you don’t specify. Also set MAHOUT_HOME to your mahout directory. This is important so it can add its jar files to the CLASSPATH correctly.
  • I ended up compiling Mahout from source. The stable version of Mahout had errors I couldn’t really explain. File system mismatches or vector mismatches or something like that. I’m not 100% sure that it’s necessary, but it probably won’t hurt. Compilation is pretty simple, ‘mvn clean install’, but you will probably want to add ‘-DskipTests’ because the tests take a long time.

Boston Hadoop Meetup Group: The Trumpet of the Elephant

Heheh. But seriously, if you live in the Boston area and are working with Hadoop, or interested in working with Hadoop, or just think the name is fun to say, you should absolutely clear your calendar the night of February 15. Why? Because it’s the first Boston Hadoop Meetup Group since November, and judging by the presenter line-up, it’s going to be a doozie (or an Oozie, if you want to get all topical).

First up, MapR’s Chief Application Architect Ted Dunning (t|l) on using Machine Learning within Hadoop. I’m really excited about this one.

Second, Cloudera Systems Engineer Adam Smieszy (t|l) on integrating Hadoop into your existing data management and analysis workflows.

Last, Hadapt’s CTO Philip Wickline (t|ln) “will give a high-level discussion about the differences between HBase and Hive, and about transactional versus analytical workloads more generally speaking, and dive into the systems required for each type of workload. ”

Each talk will run about 15-20 minutes, with time for Q&A after, followed by (free) beer and mingling.

The Boston Hadoop MeetUp Group is organized by Hadapt’s Reed Shea (t|l). Hadapt is doing some very very cool stuff with unstructured and structured data processing and analytics–cool enough that founder/Chief Scientist Daniel Abadi took teaching leave from Yale to turn his research into a product.

This particular MeetUp is sponsored by Hadapt, MapR, Cloudera and Fidelity, and is being held at Fidelity’s downtown office, from 6 to about 8:30 pm. For more information and to sign up, visit the event page.

See you there!

Boston’s Big Datascape, Part 1

[Excerpted from the Riparian Data blog]
Big Data, or the technologies, languages, databases and platforms used to efficiently store, analyze and extract conclusions from massive data sets, is a Big Trend right now. Why? In a nutshell, because a) we are generating ever increasing amounts of data, and b) we keep learning faster, easier and more accurate ways of handling and extracting business value from it. On Wall Street, some investment banks and hedgefunds are incorporating sentiment analysis of web documents into their trading strategies. In healthcare, companies like WellPoint, Explorys and Apixio are using distributed computing to mine health records, practice guidelines, studies and medical/service costs to more accurately and affordably insure, diagnose and treat patients.

Unsurprisingly, Silicon Valley is big data’s epicenter, but Boston, long a bastion of Life Sciences, Healthcare, High Tech and Higher Ed, is becoming an important player, particularly in the storage and analytics arenas. This series aims to spotlight some of the current and future game changers. These companies differ in growth stages, target markets and revenue models, but converge around their belief that the data is the castle, and their tools the keys.

1)      Recorded Future

  • Product: Recorded Future is an API that scans, analyzes and visualizes the sentiment and momentum of specified references in publically available web documents (news sites, blogs, govt. sites, social media sites etc)
  • Founder/CEO: Christopher Ahlberg
  • Technologies used: JSON, real-time data feeds, predictive modeling, sentiment analysis
  • Target Industries: Financial Services, Competitive Intelligence, Defense Intelligence
  • Located: Cambridge, MA

2)      Hadapt

  • Product: The Hadapt Adaptive Analytical Platform is a single system for processing, querying and analyzing both structured and unstructured data. The platform doesn’t need connectors, and supports SQL queries.
  • Founders: Justin Borgman (CEO); Dr. Daniel Abadi (Chief Scientist)
  •  Technologies used: Hadoop, SQL, Adaptive Query Execution™
  • Target Industries: Financial Services, Healthcare, Telecom, Government

[Read the full post]

Combiners: The Optional Step to MapReduce

Most of us know that hadoop mapreduce is made up of mappers and reducers. A map task runs on a task tracker. Then all the data for each key is collected from all the mappers and sent to another task tracker for reducing, one reduce task per key. But what slightly less than most of us know about are combiners. Combiners are an optimization that can occur after mapping but before the data is segregated to other machines based on key. Combiners often perform the exact same function as reducers, but only on the subset of data created on one mapper. This allows the task tracker an opportunity to reduce the size of the intermediate data it must send along to the reducers.

For instance, if we take the ubiquitous word count example. Two mappers may produce results like this:

Mapper A Mapper B
X - 1
Y - 1
Z - 1
X - 1
X - 1
X - 1
Z - 1
Y - 1
Y - 1

All those key-value pairs will need to passed to the reducers to tabulate the values. But suppose the reducer is also used as a combiner (which is quite often the case) and suppose it gets called on both results before they’re passed along:

Mapper A Mapper B
X - 2
Y - 1
Z - 1
X - 2
Z - 1
Y - 2

The traffic load has been reduced. Now all that’s left to do is call the reducers on the keys across all map results to produce:

X - 4
Z - 2
Y - 3

An important point to keep in mind is that the combiner is not always called, even when you assign one. The mapper will generally only call the combiner if the intermediate it’s producing is getting large, perhaps to the point that it must be written to disk before it can be sent. That’s why it’s important to make sure that the combiner does not change the inherit form of the data it processes. It must produce the same sort of content that it reads in. In the above example, the combiners read (word – sum) pairs and wrote out (word – sum) pairs.

image via: wpclipart.com

Speculative Execution: Proceed with Caution (or Not at All)

speculative execution

When a job tracker receives a map reduce job, it will divvy out tasks to several task trackers in order to complete the job. If any of those tasks fails for whatever reason (perhaps they threw an exception), then it’s up to the job tracker to restart the job on another slave. This process can occur up to three times before the job tracker gives up. But what happens if a task doesn’t fail, but it doesn’t succeed either? What if it just hangs? Perhaps that map task received an extra large or extra tough block to work with. Maybe some other application on that task tracker is running and it’s hogging the entire CPU. Maybe the task tracker has entered an infinite loop. Either way, the task tracker continues to check in from time to time, which prevents it from being killed outright, but it just isn’t finishing. The job tracker can’t possibly know why this task tracker is taking longer nor can it know when or if it will finish. What does the job tracker do?

Speculative Execution!

Without shutting down the first task tracker, it goes to another task tracker and gives it the same job. Then it’s a race. Whoever finishes first is the one that gets to submit its results. The other is killed (a most cutthroat race). That’s it.

Speculative execution isn’t always appropriate. In fact, some people recommend that you disable it for reduce jobs entirely. Why? Continue reading Speculative Execution: Proceed with Caution (or Not at All)

Traversing Graphs with MapReduce

Hadoop can be used to perform breadth-first searches through graphs. One such way is done through a series of mapreduce jobs where each mapreduce is another layer of the breadth first search. Here is a very high-level explanation of what I mean. Suppose we have the simple graph:

 E <-- C <-- F
 ^     ^     ^
 |     |     |
 A --> B --> D

This data would likely be represented in our Hadoop cluster as list of connections, like: Continue reading Traversing Graphs with MapReduce

For the Data Scientists: 5 Upcoming Big Data Conferences You Shouldn’t Miss

Big data is a big deal right now, and it’s only going to become a bigger deal in the future, so it makes sense to learn about as many of its aspects as you can, as quickly as you can. Or pick one and learn it very well. Or don’t pick any, if you are a staunch believer in the shelf-life of traditional data warehouses. From a machine learning deep-dive to an open-source buffet,  the following five conferences provide educational and networking opportunities for both the specialists and renaissance persons among you. Attending a cool one I’ve missed? Let me know in the comments!