Saturday, October 01, 2011

Deciding between ESB (ServiceMix, OpenESB, Mule, etc) vs. Camel

While designing a SOA solution, we face a question "Who will better fit my requirements, an ESB or Camel?"

I will suggest you to first list down what all are Conceptual requirements. Then, match with what Camel or an ESB (ServiceMix, OpenESB, Mule, etc) provides.

Camel provides:
  • Routing across multiple transports/components,
  • Transformation,
  • Mediation,
  • Orchestration
  • Monitoring

In addition to Camel an ESB provides:
  • Infrastructure features: Support for multiple container for hosting of services (Spring, Camel, JBI, Web,...), say you want hosting of new services in JBI container in addition to existing services in external web container, or publishing of adhoc new services (Enterprise Information Integration) on JBI container.
  • Modularization: OSGI support
  • Integration Platform.

Thursday, September 01, 2011

Crispy overview of Apache Mahout - Machine Learning on Hadoop

Apache Mahout [2] is a machine learning based algorithm library implemented based on Stanford University's research paper published in 2006[1] , which is intended to run as Apache MapReduce jobs on the Hadoop cluster.
They have implemented various ML (brief for Machine Learning) algorithms and classified them in some practical categories. Following is the description about them,
Recommendation: It is the recommendation to the user based on user's behaviour or historical records
  • Non Distributed (non hadoop solutions, can be just run though Mahout library and Java SE 6)
  • Distributed: Slope-One, Distributed Nearest Neighbour (Item based), and Distributed Nearest Neighbour (User based)
In simple words, algorithm like Slope-one, runs 2 step MapReduce. In first step, it computes user based item pair matrix in mapper and reducer computes differences in the item pair. In the second step, average differences from the difference list per item pair is computed.
whereas, at a high level, Nearest Neighbour algorithm, underlying uses algorithm like, Pearson-regression to first find the prediction to estimate preferences and then it picks the top preferences.
Clustering: Clustering algos are either joining (term used is 'agglomertive') or breaking up (term used is 'divisive'). Given a huge set of data, either we start with single data set cluster and gradually join/break them based on their 'distance' calculation (again there are many distance calculation criteria), eventually get a population of datasets which is more sensible or more relational to each other. Many clustering algorithms are integrated into Mahout library.
k-means, fuzzy k -means and canopy are couple of very known clustering algorithms. Surveys, market research data suits a lot to divisive clustering mahoot algos. They say k-means problem is NP-hard (reminds of engineering days, computation science)
Classification: Combining the quantitative information or characteristics of the new individual item and the training set (used for the previous classifications), we decide the category. Tracking, Discoveries, and Recognitions are couple of common application domains for Classification algos. Up to Mahoot 0.5 release, Bayesian, Logistic, and Random forest are integrated, and a partial support for Neural Network.
Dimention Reduction: Probably, the best use of parallelism to solve complexity of multi dimensional dataset to fewer dimensions to that we can analyze the problem. Mahout has implementation of Singular value algorithm to solve this problem.
For application domain implementations, which could be social network sentimental analysis, analysing geospecial data, pattern recognition, robotic vision, etc., given a problem to be solved there could be 3 possible approaches to solve it:
  • Map to existing Mahout integrated algorithm, provide the datas set on HDFS.
  • Implement our own solution for the algorithm in MapReduce programs.
  • Hybrid approach, utilize core Mahout integrated algorithms, provide custom behaviour using MapReduce and provide the solution.

Sunday, August 21, 2011

Using Https URL connection in Java

It’s neither a new technology nor something which has not been explored yet. I happened to come across a requirement where https call required to be invoked using Java code. So the requirement was, given an URL which happens to be https I require to collect a sever certificate, do an initial handshake (in SSL) or do a later secure connection (in TLS), finally access the resource on https site.
Step 1: https server setup
This is simplest step, you can do any web server https setup, just require to extract the server certificate. We will require it to be included in our Java client trust store. What I did was started IIS 7 on my box, extracted server certificate to a file (servercert.pfx).
Step 2: Create jks keystore having server certificate
In case you have jks file you can export and import certificate using keytool. Since, I had .pfx file, following requires to be done using openssl,
  • Extract key and certificate from PFX certificate to PEM format
    • Extracted key as: openssl pkcs12 -nocerts –in <pfx-file> –out <any-pem-extn-key-filename> -passin pass:<pfx-password> -passout pass:<pem-password>
    • Extracted cert as: openssl pkcs12 -clcerts -nokeys –in <pfx-file> –out <any-pem-extn-cert-filename> -passin pass:<pfx-password>
  • Convert PEM key and certificate to DER
    • openssl pkcs8 -topk8 -nocrypt –in <pem-extn-key-file> -inform PEM –out <any-der-extn-key-filename> -outform DER -passin pass:<pem-password>
    • openssl x509 –in <pem-extn-cert-file> -inform PEM –out <any-der-extn-cert-filename> -outform DER
  • Use DER key and certificate to make jks keystore
    • You have to create an instance of keystore, load the bytes from der files and call setKeyEntry and setCertificate method. There is piece of code available at agentbob named as Use this program (modify keystore file name, alias, key store password you may want to set)
Though for SSL establishment, we require just server certificate to be verified with client’s copy when the initial establishment happens. If the client trusts the copy of the SSL certificate server has sent during SSL establishment, client sends a message, in turn server sends a digitally signed acknowledgement to start SSL encrypted session. You may send client certificate, but it’s on the SSL server side, whether it is configured to accept/ignore/require client certificates.
Step 3: Write some Java client code to establish SSL socket connection
We require to set SSLSocketFactory and HostVerifier on HttpsURLConnection
HttpsURLConnection httpsConnection = (HttpsURLConnection)url.openConnection();
InputStreamReader content = new InputStreamReader(httpsConnection.getInputStream());
for (int i=0;i != -1;i =
    System.out.print((char) i);

Host Verifier is an optional code component, which helps you to verify whether your session peer is same you are assuming to connect, can be written as,

private HostnameVerifier getHostVerifier(){
HostnameVerifier hostnameVerifier = new HostnameVerifier() 
    public boolean verify(String urlHostName, SSLSession session)
        System.out.println("Warning: URL Host: " + urlHostName + " v/s " + session.getPeerHost());
        return true;
return hostnameVerifier;

Most importantly, we require to set SSL socket factory with right keystore and truststore, we can use TLS instance too, you can get name of SSL/TLS algorithms here

private SSLSocketFactory getSocketFactory() throws ...{
    SSLContext sctx = SSLContext.getInstance("SSLv3");
    SecureRandom secureRandom = new SecureRandom();
    sctx.init(getKeyManagers(), getTrustManagers(), secureRandom);
    return sctx.getSocketFactory();

Key Managers and Trust Managers can be created as,

private KeyManager[] getKeyManagers() throws ...{
    KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
    kmf.init(keystore, password.toCharArray());
    return kmf.getKeyManagers();

private TrustManager[] getTrustManagers() throws ...{
    TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
    return tmf.getTrustManagers();

Finally the keystore instance used above in the key manager and trust manager, you have to load the server certificate keystore.

File file = new File("servercert.jks");
FileInputStream is = new FileInputStream(file); 
keystore = KeyStore.getInstance("jks"); 
keystore.load(is, keystorePassword.toCharArray());    

Once I had this piece of code knitted together, I received following error, HelloRequest followed by an unexpected  handshake message. I learned we required to set a system property,

System.setProperty("", "true");

Once this has been done, I was able to make https call to my IIS hosted site as,

maker.makeHttpsCall(new URL(;

I hope this will help you establish SSL connection from Java code.

Monday, May 16, 2011

Tuning Cassandra cluster configuration and improving performance

If you are considering to migrate to NO-SQL solution and want to use some data store which is open-source, massively scalable, fault tolerant, peer to peer and column based, there is a high probability that you will choose Apache Cassandra. According to my understanding, Apache Cassandra edges over Apache HBase when the data node topology required is something which is more robust and peer to peer. But again, HBase provides a tight integration with Hadoop ecosystem, which in a way or other can also be achieved by Apache Cassandra.
Anyways, yaml is always available to list down the configurable parameters but just to have a checklist, couple of important factors which should be take care when configuring Cassandra cluster for production environment (cloud/non-cloud based):

Q: Do we need high availability?
A: Yes, then we require to have multiple nodes across multiple data centres. We require to configure below:

  • Endpoint Snitch: You may want to specify which cassandra nodes are in the same data centre/same rack, write in ip address, data centres to be there in cluster. We can specify octet based (RackInferring), explicit property file mentioning node-DC mapping (PropertySnitch), or even EC2 region based(EC2Snitch)
  • Replication Factor: the copies of the data we want in our cluster, less or equals to number of nodes. 1 means no copies (just original data).
  • Partitioner: controls how data is distributed across nodes. If you want to facilitate range slice queries or want to provide indexing using Lucene/Solr (over Cassandra) then you will require OrderPreserving. There is a probability of Hot Spot in your cluster when using OrderPreserving. RandomPartitioning takes care of this hot spot problem as it performs a MD5 hash over "data key" get a "location key" which spans over a range of 0..2^127.
  • Seeds: You would like to feed in ip address of other nodes in the cluster.
  • ReplicaPlacement: You may want to control where data replicas are placed (something to add up to HA), OldNetworkTopology/RackAware places first one in other DC and others (if required) in other racks of the same DC. NetworkTopology places replicas (if required by ReplicationFactor) on locations each DC and Rack inside it.
Q: Do you want some new nodes to be added dynamically to the existing cluster?
A: Yes, then you will require to set Autobootstrap to true. For consistent hashing, Initial token must evenly divide the keyspace. Specifying Initial_Token you can dictate the token responsibility of the new node in the cluster. You can calculate the initial token for kth node in a n node cluster by,

Q: How to ensure consistency across distributed cassandra nodes?
A: Defined at data access layer (hector/thrift/avro), Consistency Level plays an important role in determining whether the overall cassandra cluster may serve old data or not (if replication has not happened yet). Consistency Level ANY, ONE for write and ONE for read gives better performance but at the cost of eventually consistent data. A higher data consistency can be achieved by QUORUM ((N/2)+1), LOCAL_QUORUM (local DC's (RF/2)+1), EACH_QUORUM (each DC's (RF/2)+1), and finally ALL (which blocks till read/writer is served by all).
Just to sum up, if you want strongly consistent data (consistency level for write+consistency level for read >Replication factor. QUORUM provides consistent data and availability of data in case of some node failure. If performance/latency is more important than you can lower the values of consistency level for either READ or WRITE or both.

Q: How to achieve performance SLAs?
A: Read performances can be enhanced by tuning couple of parameters:

  • ColumFamily enables you to have Key and/or Row caching on them. You can specify the number of key/rows, percentage or fraction over the whole data you want to store in cache. Key cache is lighter as they data stored in them is just keys. By default, Cassandra caches 200, 000 keys per CF. Key cache decreases a I/O to index file to figure out the row location of the data corresponding to that key. It's very productive to have key cache. Row Cache holds the entire content of a row in cache, by default it is off. The overhead of enabling row cache or over increasing it is that you may require more JVM heap of Cassandra, may adverse the performance. Also, if data's column size is small, it's good so that we can increase the size of row cache without worrying too much about memory consumption, but if columns are larger than increasing row cache size is tricky. The best way to fine tune it is to watch out "RowCacheHitRatio" exposed per CF on JMX over a sample data run and according to the utilization configure the optimal size. If "RowCacheHitRatio" is too low <20-30 (which will be in case writes are much higher than read or extremely random reads, anyhow will trigger high GC activities), then I will suggest it doesn't make sense to enable row caching. You can use nodetool cfstats to see the performance/hit raio of key/row cache. It takes time to get a good idea about the hit rate because hit rate gradually matures.
  • Read performances can also be enhanced by fine tuning the Concurrent Reads, usually the rule is 4 threads per CPU core in the cluster. The higher the values the number of threads spanned for read, if the machines have faster I/O than the usual commodity versions then we can even increase this number a bit. Increasing it too much will in turn cause high context switches and will not yield higher performance gains.
  • Try to use faster RPC client instead of wrapper over wrapped RPC layer.
  • Read performance also get seriously degraded by contention during SSTable compaction, "NoofLiveSSTables" exposed on JMX (I guess so) gives you a typical indication about how many SSTables are getting used for a CF, or else we can use cfstats sstable count to get that. If this number is quite high or increasing, than your read operation may have contention with sstable compaction. Also, the more SSTables, the more fragmented (internal and external both) your data is. Increasing the configurations like memtable_flush_after_mins (or memtable_operations_in_millons or memtable_throughtput_in_mb {total ram/(1048576*16)}) will decrease memtable flushes which decrease number of SSTables and eventually number of them be compacted. Hence, less conflicts with read operations, which will improve read performance. The larger the value of these variables, the more heap memory you require to bubble up this in memory data structure. Even, the thread priority of the compaction thread inside can be controlled which eventually decreases number of compactions, this is a JVM argument to cassandra.

Write performance can be also be tuned, but do we really require to increase the lightning fast write behaviour cassandra provides? Anyway, following are the some of the ways to achieve it,

  • We can use bulkloading API's (StorageProxy, mutation or Binary verb) to write to Cassandra instead of using one of the wrappers.
  • Decreasing the Write Consistency level will also help as less number of nodes required to be blocked for the write operation.
  • It's preferable to use separate disk drive to write commit log. I guess this avoids I/O contention of Commit Log writes with SSTable reads.
  • You can tune Concurrent_Write to increase it from defaulted 32. Concurrent Writers should be set a bit higher than Concurrent Reads, ranging between 1x to 1.5x (x=number of Concurrent Reads). You can set it around 12x of cores.
Swap memory may throw important challenges to achieve effective read/write performances in a java application. Operating System swaps pages to and from the disk even when sufficient memory available. During swap space read/write you may observe some difference in performance. You can disable the swap with swapoff command. On linux based OS, you can append vm.swappiness=0 or 5 to file /etc/sysctl.conf to reduce likelihood of OS to use swap space. Cassandra provides a memory_locking_policy parameter in yaml, you require to enable it.  JNA libraries helps you to lock JVM memory making it unevicatable to be swapped.
Performance of Memory Mapped I/O is better than regular I/O, on a 64 bit architecture you can use this performance efficient I/O by setting disk_access_mode  in yaml to mmap.

Q: Do I require to tune JVM for Cassandra cluster setup?
A: Yes, you may want to customize it. Make MAX and MIN to be same (just to avoid full GC during heap growth/heap shrink) It's expensive to grow the heap, just MIN to what you assume should be the maximum cassandra will consume, make MAX a bit higher than that for "just in case" cassandra is hit with more load. As in most cases, its better to do a full GC and grow the heap rather than get an OutOfMemory and crash. Heap size to be allocated to Cassandra, can be calculated to be approximately by, Memtable_throughtput_in_mb*3*(number of hot column families) + I GB + key cache + row cache.Cassandra default to many GC configurations which are perfectly configured for the big data entities. Just to for example, -XX:SurvivorRatio (defaulted to 8), try to retain as many objects in survivor spaces so that they can be reclaimed by GC, if we increase the size of survivor space then copying these long lived objects between survivors will trigger minor GC. Interesting balance, but better to copy more between survivors than to promote more to Tenured. Tenuring distribution can be seen using -XX:+PrintTenuringDistribution.

We have -XX:+UseParNewGC, parallel GC has ergonomics, like tuning young generation, it avoids/decrease frequency of major GCs, it avoids full GC by avoiding/decreasing promotions, maximizes the heap size. Also, we use CMS (ConcurrentMarkSweepGC), this is for tuning old generation, CMS is designed to avoid stop-the-world pauses. Number of concurrent threads in CMS can be controlled by -XX:+ParallelCMSThreads. Pauses dones during remark phases can be reduced by enabling -XX:+CMSParallelRemarkEnabled. Just to put a point if you want to use, by default classes will not be unloaded from the permanent generation when using CMS, this can be enabled by using -XX:+CMSClassUnloadingEnabled.

On 64 bit machines, on Java SE 1.6 update 14, you can utilize the compressed pointers which results in smaller heap sizes, you have to append -XX:+UseCompressedOops to enable this.

Q: Any hardware changes to boost the performance?
A: Yes sure.
  • You can utilize 10 Gigabit ethernet to increase network throughput.
  • You can increase RAM, RAM can be allocated to Cassandra JVM heap, eventually passes advantage to lot many factors.
  • Multi Channel Memory Architecture (MCMA) technology increases the transfer rate between RAM and Memory Controllers by adding channels, theoretically transfer rate gets multiplied by the number of channels. Modern high end chip sets like, i7-9x series or latest of Xeon chip sets supports triple channel memory architecture. Even quad or eight channel memory chip sets are also present. You can optimise the RAM according to the multi memory channel, you can pair DIMMs as the number of memory channels, for example, if using triple channel memory chip sets, let each server to have triplets of DIMMs. You have to cross check the MCMA support of your server's Processor as well as Motherboard, once you know this, you can pair up your DIMMs according to that supported number to extract the best performance from the RAM.
  • On master or server you typically add more disk as RAID, whereas just to avoid administrative overhead, pipelined writes, you must add more disk to slaves as JBOD (typically 0.8*core number of disk to get optimal performance, but must not be greater than 1*core number of disk).
  • You can use fast seeking disks, Solid State Drives (SSD), SCSI systems are also better in performance. Typically, SATA drivers are recommended over SAS drives.
  • If you are setting filesystem as ext3 (you can disable the noatime, so that administrative overhead can be reduced) Changes require to done in /etc/fstab and do a mount -o remount /. If you have option to use other file system, you can use faster filesystems like ext4 and XFS, which also supports bigger file sizes and bigger volumes.
  • You can club multiple network cards to achieve high performance.