Here are my thoughts on working with NoSQL technologies from past more than a year while building a platform in archival/e-discovery space where the cost of data storage matters a lot. Clearly its very difficult claim to be expert on every possible technology trend especially in NoSQL where technology/usecase over-lap to solve a problem domain & its not easy to pick one against other. There is also a cost of maintaining multiple data sources (code base & relationship). Here I am documenting overview of technologies that we picked and plan to document all the major issues and best parts separately for each of the stack in the next set of write ups that can help developers to make informed technical decisions especially with my favorites "Elasticsearch","Hazelcast" with spring data wrapper libraries.
I guess the paradigm shift is "Think about the queries(business), horizontal scaling & work backwards." rather than data first approach (storage, integrity) and make queries to work on the model. Data storage is driven by query usage patterns (or functional requirements) and cost of building and maintaining it. The latency and storage costs have moved from non-functional to basic functional requirements, distributed computing has become basic necessity and not the luxury. In NoSQL context also data is still the king but he is at the mercy of query access patterns and the storage cost.By end of the day most of the time software is all about storing & retrieving the useful data in cost effective way. Performance/transaction should not be the reason to abandon RDBMS, but distribution of data (because of scale) and ease of development are the reasons. Shortening development cycle, lowering costs and enabling new ideas should be goal for any technology professional & non-relational data store mechanisms increasingly fit the bill.
There is no single silver bullet for data storage requirements (for anything for that matter) that servers any serious project, it has to be polygot persistence & the trend looks to be ir-reversible.The important here is that each of them provide their scalability & performance by offering a limited data model. They compromise one of the feature of the CAP (Consistency, Availability, Partition). Although everyone claims to do almost anything. You name feature, the evangelists from NoSQLs comes with post saying "Look ma! I can also do this", but its important for us to make informed decisions after doing POCs making sure that these are not stretching themselves too much to achieve the goal (latency, storage efficiency). It was key to pick the storage nodes & technology behind them considering their sweet spots & after evaluating with their counterparts.
Based on use cases we made following decisions.
Search from key - Document Search (
MongoDB) Serves most of the RDMS requirements. Ex: Blog
Search the term or value - Full Text Search (
ElasticSearch) serves the ability look for text and then the document. Ex: conversations, documents.
Search and store by order - Column Family (
Cassandra) serves the ability to constant write for data associated usually with time series Ex: twitter
Search for large binary objects one at a time- Object store provided by cloud vendors,
ceph
Search/process for results in millis - In memory data grid solution (
Hazelcast)
Although all are scalable solutions cassandra stands out the best in linear scalability offering both in read (for range based queries) and writes efficiency ir-respective data size as column family keeps the columns that fit together & work with append only mode making writes faster than reads & doesn't depend on the current size of the data.
Future plan is to use graphs.
Search by relationship - Graph DB (
Neo4J) Users and their relationship Ex: social graph
Although there numerous links suggesting selection criteria, I found following links interesting.
http://blog.nahurst.com/visual-guide-to-nosql-systems - explains following CAP theory & what you will be giving up among three (Consistency, Availability & Partition aware) with most prominent solutions in this space.
http://highscalability.com/blog/2011/6/20/35-use-cases-for-choosing-your-next-nosql-database.html - Documenting use cases
http://kkovacs.eu/cassandra-vs-mongodb-vs-couchdb-vs-redis - Easy to follow well documented introductions.
KAFKA
A distributed pub-sub messaging system designed for throughput. It is maintained/used in LinkedIn & more importantly written in scala which is my favorite language these days. Kafka is log centric, ordered, im-mutable and sequence based (offset). It is replication/persistence by default. It concentrates durable, scalable message storage system & does pretty well.
As author defines it as "kafka is a system that supports long retention of ordered data" - mainly used for pipeline pushing data to the system that's re-playable.
Buzzwords:
topic - category of data.
partition - log of records (data maintained for duration, week, month ...)
log sequence number (offset) - acts as state of system.
consumer/producer - for pulling and pushing the messages.
Biggest success story:
LinkedIn claims of processing 175 TB of in-flight data with ~1.5 ms latency of 7 million writes and 35 million reads/second. This is what I heard from kafka author himself when I recently listened to a webinar hosted from orielly publishers.
Uses:
Zookeeper
Issues:
No easy way to trap re-balancing events. Scala code is difficult with work especially with existing java code.
STORM
Storm is like a pipeline where you push individual events and then get processed in a distributed fashion, it makes it easy to process massive streams of data in a scalable way, and provides mechanisms for doing things like guaranteeing that the data will be processed.
there is always some kind of confusion its relationship with hadoop, storm creator clearly says they are complimentory & comes up with following thumb rule while adopting,
"Any time you need to look at data historically(already saved) , use batch processing (i.e. Hadoop) as its cheap & scalable, whenever you need to look at all the data once, then anything you do as it comes in in real time, use Storm for that."
The ability of storm to run multiple copies of a bolt’s/spout’s code can be run in parallel (depending on the parallelism setting) is the crux of the solution and helps to scale to insane numbers by taking care of cluster communication, fault-tolerant fail-over, scalable & durable messaging with order and distributing topologies across cluster nodes. Each worker could be assigned process, memory & threads(tasks) depending on their requirement making ideal for any growing company for horizontal scale requirement.
Buzzwords:
Spout - is a source of streams in a topology making it a starting point.
Bolts - any piece of code, which does arbitrary processing on the incoming tuples
tuple - basic unit of data that gets emited in each step
topology - layout of the communication which is static and defined upfront, deployable workflow manager that ties spout and bolts together. It can also be described as a graph consisting of spouts (which produce tuples) and bolts (which transform tuples).
Nimbus - similar to hadoop job tracker, service that tracks all the workers executing the bolt.
supervisor - worker (collection of executors)
Spouts are designed and intended to poll, we can't push to them. So we have kafka to keep the messages and spout is consumer triggering the indexing process.
Built on:
LMAX Disruptor : intra-worker communication in Storm (inter-thread on the same Storm node)
ZeroMQ or Netty: Inter-worker communication (node-to-node across the network)
Zookeeper: cluster management of Storm (automatic fail-over, automatic balancing of workers across the cluster)
Biggest success story:
Twitter claims to index four hundred million tweets within 50 milliseconds with the help of storm.
The issues:
Inter-topology communication: nothing built into Storm. (we used kafka). The UI isn;t mature. Storm is not well suited to handle OOM kind of errors. Its tough to manage the state across the bolts. (errors, progress)
Samza & Spark are also tries to solve similar kind of problems that storm is trying to solve. Here are some notes in comparison with spark which apparently is doing great & looks to be has the largest momentum.
Storm Vs Spark
Both frameworks are used to parallelize computations of massive amount of data.With Storm, you move data to code. With Spark , you move code to data (Data parallelism). Spark is based on the idea that, when the existing data volume is huge, it is cheaper to move the process to the data (similar to Hadoop map/reduce, except memory storage is aggressively used to avoid I/Os which makes it efficient for iterative algorithms)
However, Storm is good at dynamically processing numerous generated/collected small data items (such as calculating some aggregation function or analytics in real time on a Twitter stream) also called task parallelism. Spark applies on a corpus of existing data (like Hadoop) which has been imported into the Spark cluster, provides fast scanning capabilities due to in-memory management, and minimizes the global number of I/Os for iterative algorithms.
Storm is like a pipeline where you push individual events in which then get processed in a distributed fashion.
Instead, Spark follows a model where events are collected and then processed at short time intervals (few seconds) in a batch manner.
Spark’s approach to fault tolerance is that instead of persisting or checkpointing intermediate results, Spark remembers the sequence of operations which led to a certain data set. So when a node fails, Spark reconstructs the data set based on the stored information
LOGSTASH
provides centralized logging server that listens for connections from Java application servers, accepting streams of logs when they connect & it filters, modifies, and routes those streams to the appropriate outputs.Its one of the corner stone product offered from elasticsearch family.
ELASTICSEARCH
Distributed (through shards) and Highly Available (through replicas) Search Engine. Its Document oriented, dynamic and provides reliable, asynchronous write Behind for long term persistency with (Near) Real Time Search support.
Buzzwords:
Shard : a single Lucene instance
Replica : is a copy of the primary shard for fail over and performance
Index Type : is like table
Index can be sharded with a configurable number of shards & each shard can have one or more replicas.
Used native java ES APIs instead of HTTP - spring data wrapper
Increased refresh time to higher number (index.engine.robin.refresh_interval). (default is 1 second)
Increased the indexing buffer size (indices.memory.index_buffer_size), it defaults to the value 10%
Increased the number of dirty operations that trigger automatic flush (so the translog won't get really big, even though its FS based) by setting index.translog.flush_threshold (default is 5000).
Increase the memory allocated to elasticsearch node default is 1G
Decreased replica count (or nothing with 0), and increase it later for HA and search performance. Replicas can be changed @ runtime but not the number of shards.
Increased the number of machines you have so you get less shards allocated per machine.
Use scan mode for search while search all results.
Planning to use SSDs for translog & check the performance.
Lucene (search engine behind elasticsearch) requires these to be in RAM, so we should be paying attention to these "deleted docs", " norms", "terms index", " field cache", "doc values"
In lucene as term will be stored once per field, searches work by locating terms and finding documents. So initially size will decrease and will get stabilized after most of the terms are indexed. So don;t worry about initial numbers usually it translates into 20-30% of the content.
CASSANDRA
column-oriented way of modeling system that is eventually Consistent relies.
Buzzwords:
Column. The basic element which is a tuple composed of a timestamp, a column name and a column value. The timestamp is set by the client and this has an architectural impact on clients’ clocks synchronization.
SuperColumn. This structure is a little more complex. You can imagine it as a column in which can store a dynamic list of Columns.
ColumnFamily: A set of columns,
KeySpaces: A set of ColumnFamily.
Row in cassandra sense is a list of Columns or SuperColumns identified by a row key.
Cassandra allows only N (under 3) number of executions to be checked and immediately returns to value. Therefore, write can be conducted successfully even with failures in the replication node, which raises usability. Histories of failed write operations are recorded on a different node, and the operation can be retried at a later date (this is called “hinted handoff”). Since the success of replicated writing is not guaranteed, the data suitability is checked in the reading stage. Generally, if there are multiple replications, it is collected into one when reading. However, Cassandra keeps in mind that not all replications match, and reads the data from all three replications, checks whether it is identical, and restores the latest data if it is not suitable (this is called “read repair”).
HAZELCAST
As compute nodes doing processing, In memory data grid solution is great for building applications that requires real time latency.
MONGODB
is mmap’d linked lists of BSON documents with B-tree indexing.
OSGI - JBoss Fuse
OSGi can help in managing "ClassNotFoundExceptions" by ensuring that code dependencies are satisfied before allowing the code to execute. It also help with running with multi version jars and hot deployments with life cycle management.
- OSGi verifies that the set of dependencies are consistent with respect to required versions and other constraints.
- Package application as logically independent JAR files and be able to deploy. (gracefully without affecting or managing others)
- Manage new level of code visibility in a bundle(JAR), can make public classes unavailable which is not possible with java access specifier symantics (private, protected,public, package) making deployment/upgradation easy.
- Extensibility mechanism with like plugin (eclipse,netbeans)
- Helps to dynamic service model in your application, where services can be registered and discovered both declaratively and programmatically during execution.
- Shell scripting for JVM to investigate
- JVM footprint for special-purpose installs could be greatly reduced (with positive implications for disk, memory, and security).
- Benefits although looks great, but having other systems (containers) sharing the same jars (storm, hazelcast, etc..), I don't think its worth spending efforts with OSGI.
Its always better to streamline on library versions and manage them in a consistent manner. Its a nightmare with OSGI containers & we end up solving the jar issues most of the time. I think OSGI is not worth the effort when we have different container with shared jars.
References:
http://wiki.apache.org/cassandra/ArticlesAndPresentations
http://blogs.atlassian.com/2013/09/do-you-know-cassandra/