Sunday, October 12, 2014

Elastic search experimentation with enron emails.

ES (Elasticsearch) is distributed, real-time search and analytics engine. I have been using this product from more than 18 months. It is a de-facto distributed search solution built on lucene. Its fast and easy to develop.
Features could be broadly classified as

  • Full text search, search by words with boolean operators (Ex: (java OR techonlogy) AND (bangalore OR hyderbad) 
  • Easy to build filters, facets, & aggregations
  • Fast response times with large data volume (usually measured in millis)
  • Ability to make geo-localized searches & configurable robust ranking system

Its a incredible tool for collecting analytic information & possibly with my experience easiest to build

Once we got convinced ourselves that ES is best option for given use case, usually product managers/analysts come up with questions like "tell me what is the node/RAM/Disk requirement for indexing (say 1TB) data?" & "what's the indexing strategy to use?" (like index schema, index sharding that could be time based OR data based etc...). Unfortunately there is no immediate correct answer, there are lots of  data points & query characteristics needs to be collected before we even try to attempt to answer questions although people asking these questions are usually uncomfortable to wait and lengthy answers.
Here are few guide-lines for asking questions before trying to come up with answer.
  • required indexing rate (acceptable/desirable limits)
  • total # of docs with payload characterstics
  • latency requirements for searches (well, its for types of searches)
  • Full text search requirements (Highlight, analyzers, internationalization)
  • Indexing schema requirements (Is there any join requirement with nested elements)
  • Indexing schema metadata requirements (especially for date ones & the precision requirements)
Although internet literature claims ES scaling to insane numbers, Its all contextual (nature of data) and situational (nature of hardware), there is no option but to do experiment with real required data and assert the claim and quantifying hardware/software requirements as suggested by the creator ES Shay Banon.
There are too many indexing options and fine tune parameters for the ES. The exercises like below should help to find holy grail combination of ES fine tune parameters for the given use case.

Here is an effort OR sample guidelines that could be used to carry out such experiments. Usually the approach to create data using random values has the disadvantage that data it will have less/more unique content tailored that's not intended for the domain may not give desired results & worse it could be misleading. Lucene is mainly used mainly for full text search and stores the data in reverse fashion i.e. terms to docs. Here the nature of terms plays very important role in deciding the index size and query latency. In lucene as term will be stored once per field, searches work by locating terms and finding documents associated with those.

There is set of free large corpus of data  available that could be used. Here I have picked enron email data as they are pretty decent in numbers given that mostly we restrict the index size to these numbers, making it easy to extrapolate the numbers as ES indexes are sharded to scale horizontally. Its also nice to do experiment with ES features with this publicly available mass collections of "real" emails and is not bound by privacy and legal restrictions.
Although I wrote all my framework code to deal with ES in java (making use spring data elasticsearch wrapper), I used python for the current exercise. Its awesome for running deploy tool-chain (pants), data processing pipelines and other offline systems. Now its my de-facto scripting language replacing groovy. Its easy to share as its concise and reader friendly language.

Before the approach & the code, here are some highlights & observations about this data.

1. If we enable the source (which is required highlighting feature) while indexing the storage cost increases by 76%
http://localhost:9200/_cat/indices
enron-email_without_source 746.5mb  
enron-email 1.2gb
It was observed that there was no change search latency with storing source.

Although general assumption that lucene index size 20-30% of actual content may not be correct, as it depends on the indexing field. Even if I used better compression its unlikely to change the final figure drastically here. Its ~50% here. (considering source is not enabled).


2. Payload size distribution, Its important to match the use-case on how the real domain are getting pumped into ES.
Here are again ES aggregation feature comes handy for finding out the payload size distribution.
Payload range                Count
0KB to 1KB 124181
1KB to 5KB 237870
5KB to 10 KB 42331
10K to 1MB 303


You can find the awesome ES aggregation used here to extract this information. I introduced a size field to capture the size of each document & above is the result. I guess this is most killer feature in ES for building data analytics applications.

3. There were totally 66043 'to' unique addresses & 21021 unique 'from' addresses. Enron mails were extracted by email in-boxes (with complete folder structure) of 148 employees.



Transform the raw enron data into JSON and push into ES.
ES works with JSON & raw data needs to be transformed into ES. Download the data & extract the content into a directory.
Here is the python script used to create in windows machine. The script expects a firs parent directory where enron mails have been extracted & recursively parses the emails in EML format and index into ES. It also creates a fresh index (deleting previous one if exists) making it easy to test with experiments.


Search and test the generated data.
Here is python script shows how the we can do search and other powerful analytics feature like aggregations 



Although I quashed the idea of using ES (storing the content along with index) as primary storage rather than secondary storage in the beginning itself, now I do think its worth re-vising again with the availability of snapshot feature. 

Hope this helps some one trying out newly with ES.

Sunday, July 13, 2014

About my current server side software stack

Here are my thoughts on working with NoSQL technologies from past more than a year while building a platform in archival/e-discovery space where the cost of data storage matters a lot. Clearly its very difficult claim to be expert on every possible technology trend especially in NoSQL where technology/usecase over-lap to solve a problem domain & its not easy to pick one against other. There is also a cost of maintaining multiple data sources (code base & relationship). Here I am documenting overview of technologies that we picked and plan to document all the major issues and best parts separately for each of the stack in the next set of write ups that can help developers to make informed technical decisions especially with my favorites "Elasticsearch","Hazelcast" with spring data wrapper libraries.

I guess the paradigm shift is "Think about the queries(business), horizontal scaling & work backwards." rather than data first approach (storage, integrity) and make queries to work on the model. Data storage is driven by query usage patterns (or functional requirements) and cost of building and maintaining it. The latency and storage costs have moved from non-functional to basic functional requirements, distributed computing has become basic necessity and not the luxury. In NoSQL context also data is still the king but he is at the mercy of query access patterns and the storage cost.By end of the day most of the time software is all about storing & retrieving the useful data in cost effective way. Performance/transaction should not be the reason to abandon RDBMS, but distribution of data (because of scale) and ease of development are the reasons. Shortening development cycle, lowering costs and enabling new ideas should be goal for any technology professional & non-relational data store mechanisms increasingly fit the bill.

There is no single silver bullet for data storage requirements (for anything for that matter) that servers any serious project, it has to be polygot persistence & the trend looks to be ir-reversible.The important here is that each of them provide their scalability & performance by offering a limited data model. They compromise one of the feature of the CAP (Consistency, Availability, Partition). Although everyone claims to do almost anything. You name feature, the evangelists from NoSQLs comes with post saying "Look ma! I can also do this", but its important for us to make informed decisions after doing POCs making sure that these are not stretching themselves too much to achieve the goal (latency, storage efficiency). It was key to pick the storage nodes & technology behind them considering their sweet spots & after evaluating with their counterparts.

Based on use cases we made following decisions.
Search from key - Document Search (MongoDB) Serves most of the RDMS requirements. Ex: Blog
Search the term or value - Full Text Search (ElasticSearch) serves the ability look for text and then the document. Ex: conversations, documents.
Search and store by order - Column Family (Cassandra) serves the ability to constant write for data associated usually with time series Ex: twitter
Search for large binary objects one at a time- Object store provided by cloud vendors, ceph
Search/process for results in millis - In memory data grid solution (Hazelcast)
Although all are scalable solutions cassandra stands out the best in linear scalability offering both in read (for range based queries) and writes efficiency ir-respective data size as column family keeps the columns that fit together & work with append only mode making writes faster than reads & doesn't depend on the current size of the data.
Future plan is to use graphs.
Search by relationship - Graph DB (Neo4J) Users and their relationship Ex: social graph

Although there numerous links suggesting selection criteria, I found following links interesting.
http://blog.nahurst.com/visual-guide-to-nosql-systems - explains following CAP theory & what you will be giving up among three (Consistency, Availability & Partition aware) with most prominent solutions in this space.
http://highscalability.com/blog/2011/6/20/35-use-cases-for-choosing-your-next-nosql-database.html -  Documenting use cases
http://kkovacs.eu/cassandra-vs-mongodb-vs-couchdb-vs-redis - Easy to follow well documented introductions.

KAFKA 
A distributed pub-sub messaging system designed for throughput. It is maintained/used in LinkedIn & more importantly written in scala which is my favorite language these days. Kafka is log centric, ordered, im-mutable and sequence based (offset). It is replication/persistence by default. It concentrates durable, scalable message storage system & does pretty well.
As author defines it as "kafka is a system that supports long retention of ordered data" - mainly used for pipeline pushing data to the system that's re-playable.

Buzzwords:
topic - category of data.
partition - log of records (data maintained for duration, week, month ...)
log sequence number (offset) - acts as state of system.
consumer/producer - for pulling and pushing the messages.

Biggest success story:
LinkedIn claims of processing 175 TB of in-flight data with ~1.5 ms latency of 7 million writes and 35 million reads/second. This is what I heard from  kafka author himself when I recently listened to a webinar hosted from orielly publishers.

Uses:
Zookeeper

Issues:
No easy way to trap re-balancing events. Scala code is difficult with work especially with existing java code.

STORM

Storm is like a pipeline where you push individual events and then get processed in a distributed fashion, it makes it easy to process massive streams of data in a scalable way, and provides mechanisms for doing things like guaranteeing that the data will be processed.

there is always some kind of confusion its relationship with hadoop, storm creator clearly says they are complimentory & comes up with following thumb rule while adopting,
"Any time you need to look at data historically(already saved) , use batch processing (i.e. Hadoop) as its cheap & scalable, whenever you need to look at all the data once, then anything you do as it comes in in real time, use Storm for that."

The ability of storm to run multiple copies of a bolt’s/spout’s code can be run in parallel (depending on the parallelism setting) is the crux of the solution and helps to scale to insane numbers by taking care of cluster communication, fault-tolerant fail-over, scalable & durable messaging with order and distributing topologies across cluster nodes. Each worker could be assigned process, memory & threads(tasks) depending on their requirement making ideal for any growing company for horizontal scale requirement.

Buzzwords:
Spout - is a source of streams in a topology making it a starting point.
Bolts - any piece of code, which does arbitrary processing on the incoming tuples
tuple - basic unit of data that gets emited in each step
topology - layout of the communication which is static and defined upfront, deployable workflow manager that ties spout and bolts together. It can also be described as a graph consisting of spouts (which produce tuples) and bolts (which transform tuples).
Nimbus - similar to hadoop job tracker, service that tracks all the workers executing the bolt.
supervisor - worker (collection of executors)

Spouts are designed and intended to poll, we can't push to them. So we have kafka to keep the messages and spout is consumer triggering the indexing process.

Built on:
LMAX Disruptor : intra-worker communication in Storm (inter-thread on the same Storm node)
ZeroMQ or Netty: Inter-worker communication (node-to-node across the network)
Zookeeper: cluster management of Storm  (automatic fail-over, automatic balancing of workers across the cluster)

Biggest success story:
Twitter claims to index four hundred million tweets within 50 milliseconds with the help of storm.

The issues:
Inter-topology communication: nothing built into Storm. (we used kafka). The UI isn;t mature. Storm is not well suited to handle OOM kind of errors. Its tough to manage the state across the bolts. (errors, progress)

Samza & Spark are also tries to solve similar kind of problems that storm is trying to solve. Here are some notes in comparison with spark which apparently is doing great & looks to be has the largest momentum.

Storm Vs Spark
Both frameworks are used to parallelize computations of massive amount of data.With Storm, you move data to code. With Spark , you move code to data (Data parallelism). Spark is based on the idea that, when the existing data volume is huge, it is cheaper to move the process to the data (similar to Hadoop map/reduce, except memory storage is aggressively used to avoid I/Os which makes it efficient for iterative algorithms)
However, Storm is good at dynamically processing numerous generated/collected small data items (such as calculating some aggregation function or analytics in real time on a Twitter stream) also called task parallelism. Spark applies on a corpus of existing data (like Hadoop) which has been imported into the Spark cluster, provides fast scanning capabilities due to in-memory management, and minimizes the global number of I/Os for iterative algorithms.
Storm is like a pipeline where you push individual events in which then get processed in a distributed fashion.
Instead, Spark follows a model where events are collected and then processed at short time intervals (few seconds) in a batch manner.
Spark’s approach to fault tolerance is that instead of persisting or checkpointing intermediate results, Spark remembers the sequence of operations which led to a certain data set. So when a node fails, Spark reconstructs the data set based on the stored information

LOGSTASH 
provides centralized logging server that listens for connections from Java application servers, accepting streams of logs when they connect & it filters, modifies, and routes those streams to the appropriate outputs.Its one of the corner stone product offered from elasticsearch family.

ELASTICSEARCH

Distributed (through shards) and Highly Available (through replicas) Search Engine. Its Document oriented, dynamic and provides reliable, asynchronous write Behind for long term persistency with (Near) Real Time Search support.

Buzzwords:
Shard : a single Lucene instance
Replica : is a copy of the primary shard for fail over and performance
Index Type : is like table

Index can be sharded with a configurable number of shards & each shard can have one or more replicas.

Used native java ES APIs instead of HTTP - spring data wrapper
Increased refresh time to higher number (index.engine.robin.refresh_interval). (default is 1 second)
Increased the indexing buffer size (indices.memory.index_buffer_size), it defaults to the value 10%
Increased the number of dirty operations that trigger automatic flush (so the translog won't get really big, even though its FS based) by setting index.translog.flush_threshold (default is 5000).
Increase the memory allocated to elasticsearch node default is 1G
Decreased replica count (or nothing with 0), and increase it later for HA and search performance. Replicas can be changed @ runtime but not the number of shards.
Increased the number of machines you have so you get less shards allocated per machine.
Use scan mode for search while search all results.
Planning to use SSDs for translog & check the performance.

Lucene (search engine behind elasticsearch) requires these to be in RAM, so we should be paying attention to these "deleted docs", " norms", "terms index", " field cache", "doc values"
 In lucene as term will be stored once per field, searches work by locating terms and finding documents. So initially size will decrease and will get stabilized after most of the terms are indexed. So don;t worry about initial numbers usually it translates into 20-30% of the content.

CASSANDRA

column-oriented way of modeling system that is eventually Consistent relies.

Buzzwords:
Column. The basic element which is a tuple composed of a timestamp, a column name and a column value. The timestamp is set by the client and this has an architectural impact on clients’ clocks synchronization.
SuperColumn. This structure is a little more complex. You can imagine it as a column in which can store a dynamic list of Columns.
ColumnFamily: A set of columns,
KeySpaces: A set of ColumnFamily.
Row in cassandra sense is a list of Columns or SuperColumns identified by a row key.

Cassandra allows only N (under 3) number of executions to be checked and immediately returns to value. Therefore, write can be conducted successfully even with failures in the replication node, which raises usability. Histories of failed write operations are recorded on a different node, and the operation can be retried at a later date (this is called “hinted handoff”). Since the success of replicated writing is not guaranteed, the data suitability is checked in the reading stage. Generally, if there are multiple replications, it is collected into one when reading. However, Cassandra keeps in mind that not all replications match, and reads the data from all three replications, checks whether it is identical, and restores the latest data if it is not suitable (this is called “read repair”).

HAZELCAST
As compute nodes doing processing, In memory data grid solution is great for building applications that requires real time latency.

MONGODB
is mmap’d linked lists of BSON documents with B-tree indexing.

OSGI - JBoss Fuse
OSGi can help in managing "ClassNotFoundExceptions" by ensuring that code dependencies are satisfied before allowing the code to execute. It also help with running with multi version jars and hot deployments with life cycle management.
  • OSGi verifies that the set of dependencies are consistent with respect to required versions and other constraints.
  • Package application as logically independent JAR files and be able to deploy. (gracefully without affecting or managing others)
  • Manage new level of code visibility in a bundle(JAR), can make public classes unavailable which is not possible with java access specifier symantics (private, protected,public, package) making deployment/upgradation easy.
  • Extensibility mechanism with like plugin (eclipse,netbeans)
  • Helps to dynamic service model in your application, where services can be registered and discovered both declaratively and programmatically during execution.
  • Shell scripting for JVM to investigate
  • JVM footprint for special-purpose installs could be greatly reduced (with positive implications for disk, memory, and security). 
  • Benefits although looks great, but having other systems (containers) sharing the same jars (storm, hazelcast, etc..), I don't think its worth spending efforts with OSGI.
Its always better to streamline on library versions and manage them in a consistent manner. Its a nightmare  with OSGI containers & we end up solving the jar issues most of the time. I think OSGI is not worth the effort when we have different container with shared jars.

References:
http://wiki.apache.org/cassandra/ArticlesAndPresentations
http://blogs.atlassian.com/2013/09/do-you-know-cassandra/


Bookmark and Share