Wednesday, February 06, 2013

HBase+HDFS: fsck showing Average Block Replication more than Default Replication Factor

At the first go, it looks very surprising when you get Average Block Replication more than Default Replication Factor, usually what we expect is it to be equal or momentarily less than default Replication Factor.

On a cluster,
  • I saw Average Block Replication as 3 where as Replication Factor was 2.
  • Moreover, to make it more interesting, Over-Replicated blocks were 0%.

After delving deeper we figured out that, replication factor on HDFS was 2, but
HBase didn't have any configuration defined, hence was getting defaulted to 3.

So, nothing was counted as over replicated blocks, as corresponding to those HBase tables the replication factor was 3.

It's not a problem as such, just a state of confusion, to avoid it you can put your hdfs-site.xml in HBase or configure replication factor in Hbase-site.xml

Friday, June 22, 2012

Jumbune - MapReduce Execution Flow Profiler



Welcome Jumbune [juhm-b-yoon], industry’s first MapReduce Flow Profiler that will help you to:

  • Analyze Cluster wide Hadoop MapReduce Job(s) Flow Execution
  • Profile MapReduce Jobs,
  • Monitors Hadoop Clusters,
  • Validate HDFS Data

Jumbune - High level Overview
Where Jumbune Helps:
Hadoop has become the vocabulary of Big Data. Every enterprise
organization, which wants to analyze its Terabytes or Petabytes of data is
either actively evaluating Hadoop or has started using Hadoop with its
ecosystem of technologies. The scale of this processing invariably runs into
tens or hundreds of machine clusters.

The parallelism of computation brings its own set of programming challenges
including identification of faults across MapReduce logic, discrepancies in
working data, Analytics Logic, or just an unhealthy node.

It is important for you (MapReduce developer) to profile your code, validate the
input/output data set, and understand MapReduce implementation across the
Hadoop cluster for debugging purposes, which can be extremely painful. 

Jumbune Usage Overview:

Jumbune supports both UI based and shell based execution. Jumbune
execution is triggered by the user by submitting a workflow, expressed by a
simple YAML configuration. The YAML configuration consists of instructions for
individual Jumbune components. Jumbune presents intuitive self-explanatory
reports for the submitted workflow.

We presented it in Hadoop Summit 2012 - San Jose, CA.
We received good and constructive feedbacks from many technologist there. Jumbune datasheet can be found here. Please feel free to contact Impetus for any demonstrations, evaluation copies, or queries around it.

Wednesday, March 28, 2012

Data Sampling in Big Data, how it works?

Many big data projects requires to make statistical inferences, eventually they require careful data sampling. If you are using whole of your Big Data sets, you think that sampling obscures the important of your data, it infers that you don't trust sampling, which infers that you don't know your data well enough that you can make samples out of it. Gurus like, data analyst, data scientist, data statistician, or whatever you want to call them all uses sampling to create intelligent small subsets of huge data sets to infer out some of the best offers, recommendations, analysis results, to draw conclusions.

Just to quickly brush up basic concepts, Mean, Median and Mode can estimate the basic shape of distribution of data. If more or less they are equal, the data distribution seems to be very symmetric. Since Mean gets influenced by out-liners or extreme values, hence there could be instances in data set when Mean may be greater than Median and Mode, which depicts that distribution is slightly on higher side, hence distribution shape gets positively skewed, and negative skewed in otherwise scenarios. This helps in figuring out the shape of distribution and getting the centre of the distribution. At a level above, we figure out the concentration of the values in the distribution. Variance, Standard deviation and Range helps us to measure this. Variance typically gets calculated by the a division where numerator is the summation of squared differences of each value with Mean and denominator is the total number of values. Variance is also refereed as "sigma-square" whereas Standard Deviation is the positive square root of Variance, yes denoted by sigma.

If the distribution of the data is symmetric, which means that Mean, Median and Mode are roughly equal, above can be combined with Emphirical rules over Chebyshev's theorem, which tells,
around 68% of data values lies within 1 standard deviation of the mean,
around 95% of data values lies within 2 standard deviation of the mean, and,
around 99% of data values lies within 3 standard deviation of the mean.

In Big Data, we work on huge set of data, the data may be whole data lying either on a Nosql data store or on HDFS, or the data may be past data plus set of incremental of data. We may require to break the data into proportionate subdivisions of populations. The populations are divided into subdivisions by Quantiles. There are different proportions of the subdivisions of the populations, median divides into 50% each, quartile divides into 25% each, decile divides into 10% each, where as percentile divides the population into 1% each subdivision. We can use this concept to get fairly equal distribution of data, by having Quantiles over a directory containing bulk data plus incremental, to divide them into sub populations over which any of the below data sampling design can be applied to get a reasonable wide spread sample distribution.

There are couple of interesting data sampling designs, let's see how they work,

Simple Random Samples - Works for batches. Occurrence of data elements is simultaneous. Every data element in the data set has equal probability of getting picked up in the sample frame. In the implementation, every data element is assigned a random number, once the assignment has been done, the list is sorted and the sample frame is picked up. Hence, the probability of an element having a random number in the data frame is same across whole data set. Usually, at least 5% of sample data is picked to form a qualitative sample frame.

Systematic Samples - Works for streams. Occurrence of data elements is sequential. Once the initial stream has arrived, an initial data stream is picked. Then thereafter, every kth element (total data set/sample frame size) is picked as the sample. This looks even sampling algorithm unless some pattern occurs in the streams. if the same pattern gets repeated in every stream, every kth sample will be repeated in cyclic manner.

Stratified Samples - Works for streams. This design performs even sampling even if the same pattern occurs in the population. This divides the whole data set into sub-data set. Proportionate sample frames from each sub-data set reflects the fractional proportion of each sub-data set in the whole data set. Hence, the respective sized sample frames are selected from each sub-data set. Within each of the sub-data set, either Simple Random Samples or Systematic Samples can be used to select samples fairly/unbiasedly.

Cluster Samples - Works on batches. The whole data set can be divided into clusters, each cluster working as a microcosm of the whole data set, having heterogenous data element. Intent is to increase the heterogenity of data element within the cluster and decrease the variability across the clusters. Once this has been done, random clusters can be picked up, and sample data frames can be formed from them.

When taking out samples from the overall data set, the sampling error is quite inherent, it's the difference between the sample and the overall data set over which the sample has been collected. Mathematically, it is directly proportional to the standard deviation of the overall data set and inversely proportional to the sample size. Hence, more the sample size, more will be the concentration of sample mean around the overall data set mean, the more peaked the sample distribution will be around overall data set mean. The smaller the sample size, the flatter will be the sampling distribution around the overall data set mean.

Working on the unstructured data is lying on HDFS, many times we can't be sure that the distribution of data is normal, or the population variance is not known. It's difficult to come up with correct sampling in such cases. But a thumb rule can be, that assuming the whole data distribution is not heavily skewed and we are taking sufficient sample size, a t-distribution (which uses sample distribution and population distribution) should give us a good approximation of the distribution of the sample data.

Saturday, March 24, 2012

RPC Library comparison for Big Data

There are various flavors of RPC implementations available in the open source arena. Each of RPC implementation libraries has its own pros and cons. Ideally; we should select the RPC library according to specific enterprise solution requirements of the project.
Some of the features that any RPC implementation aspires for are:
  • Cross Platform communication
  • Multiple Programming Languages
  • Support for Fast protocols (local, binary, zipped, etc.)
  • Support for Multiple transports
  • Flexible Server (configuration for non-blocking, multithreading, etc.)
  • Standard server and client implementations
  • Compatibility with other RPC libraries
  • Support for different data types and containers
  • Support for Asynchronous communication
  • Inherent support in Hadoop, NoSQL
  • Support for dynamic typing (no schema compilation)
  • Fast serialization
Focusing on Big Data stack, below I compare couple of RPC libraries,
 
Support for
Avro
Thrift
MessagePack
Protocol Buffers
BSON
Fast Infoset
Woodstox

Cross Platform
10
10
10
10
10
10
10

Multiple Languages
10
10
10
10
10
10
10
Critical Requirement <= 10
Fast Protocols
10
10
3
3
3
10
3

Flexible Server (configurable thread pool, NBlock)
7
10
7
0
3
3
0
Not so Critical Requirement <=5
Simple IDL
7
10
10
7
3
3
3

Standard Server and Client
10
10
10
3
3
3
3

Fast  and Compact Serialization
5
7
7
6
6
6
7

Multiple transports and protocols
7
10
3
0
0
0
0

Inherent support in Hadoop
10
3
0
0
0
0
0

Compatibility with other RPC Libraries
5
5
0
0
0
0
0

Data types, containers
10
7
10
7
3
3
3

No Schema compilation (dynamic typing)
5
0
5
0
5
5
5

Asynchronous calls/Callback
0
5
5
2
0
0
0

Score (out of 115)
96
97
82
48
46
53
44

Thrift, Avro and MessagePack looks really impressive to me. Thrift and Avro supports most of the above listed requirements and are very well tested in battles.
Another factor of classification can be,
  • for JSON based conversation between server and client, MessagePack is the best among all,
  • for Binary data conversations, BSON should be considered,
  • for XML based conversations, Fast Infoset and Woodstox should be considered.

Saturday, October 01, 2011

Deciding between ESB (ServiceMix, OpenESB, Mule, etc) vs. Camel

While designing a SOA solution, we face a question "Who will better fit my requirements, an ESB or Camel?"

I will suggest you to first list down what all are Conceptual requirements. Then, match with what Camel or an ESB (ServiceMix, OpenESB, Mule, etc) provides.

Camel provides:
  • Routing across multiple transports/components,
  • Transformation,
  • Mediation,
  • Orchestration
  • Monitoring

In addition to Camel an ESB provides:
  • Infrastructure features: Support for multiple container for hosting of services (Spring, Camel, JBI, Web,...), say you want hosting of new services in JBI container in addition to existing services in external web container, or publishing of adhoc new services (Enterprise Information Integration) on JBI container.
  • Modularization: OSGI support
  • Integration Platform.

Thursday, September 01, 2011

Crispy overview of Apache Mahout - Machine Learning on Hadoop

Apache Mahout [2] is a machine learning based algorithm library implemented based on Stanford University's research paper published in 2006[1] , which is intended to run as Apache MapReduce jobs on the Hadoop cluster.
They have implemented various ML (brief for Machine Learning) algorithms and classified them in some practical categories. Following is the description about them,
Recommendation: It is the recommendation to the user based on user's behaviour or historical records
Algorithms:
  • Non Distributed (non hadoop solutions, can be just run though Mahout library and Java SE 6)
  • Distributed: Slope-One, Distributed Nearest Neighbour (Item based), and Distributed Nearest Neighbour (User based)
In simple words, algorithm like Slope-one, runs 2 step MapReduce. In first step, it computes user based item pair matrix in mapper and reducer computes differences in the item pair. In the second step, average differences from the difference list per item pair is computed.
whereas, at a high level, Nearest Neighbour algorithm, underlying uses algorithm like, Pearson-regression to first find the prediction to estimate preferences and then it picks the top preferences.
e
Clustering: Clustering algos are either joining (term used is 'agglomertive') or breaking up (term used is 'divisive'). Given a huge set of data, either we start with single data set cluster and gradually join/break them based on their 'distance' calculation (again there are many distance calculation criteria), eventually get a population of datasets which is more sensible or more relational to each other. Many clustering algorithms are integrated into Mahout library.
k-means, fuzzy k -means and canopy are couple of very known clustering algorithms. Surveys, market research data suits a lot to divisive clustering mahoot algos. They say k-means problem is NP-hard (reminds of engineering days, computation science)
Classification: Combining the quantitative information or characteristics of the new individual item and the training set (used for the previous classifications), we decide the category. Tracking, Discoveries, and Recognitions are couple of common application domains for Classification algos. Up to Mahoot 0.5 release, Bayesian, Logistic, and Random forest are integrated, and a partial support for Neural Network.
Dimention Reduction: Probably, the best use of parallelism to solve complexity of multi dimensional dataset to fewer dimensions to that we can analyze the problem. Mahout has implementation of Singular value algorithm to solve this problem.
 
For application domain implementations, which could be social network sentimental analysis, analysing geospecial data, pattern recognition, robotic vision, etc., given a problem to be solved there could be 3 possible approaches to solve it:
  • Map to existing Mahout integrated algorithm, provide the datas set on HDFS.
  • Implement our own solution for the algorithm in MapReduce programs.
  • Hybrid approach, utilize core Mahout integrated algorithms, provide custom behaviour using MapReduce and provide the solution.
 
References

Sunday, August 21, 2011

Using Https URL connection in Java

It’s neither a new technology nor something which has not been explored yet. I happened to come across a requirement where https call required to be invoked using Java code. So the requirement was, given an URL which happens to be https I require to collect a sever certificate, do an initial handshake (in SSL) or do a later secure connection (in TLS), finally access the resource on https site.
Step 1: https server setup
This is simplest step, you can do any web server https setup, just require to extract the server certificate. We will require it to be included in our Java client trust store. What I did was started IIS 7 on my box, extracted server certificate to a file (servercert.pfx).
Step 2: Create jks keystore having server certificate
In case you have jks file you can export and import certificate using keytool. Since, I had .pfx file, following requires to be done using openssl,
  • Extract key and certificate from PFX certificate to PEM format
    • Extracted key as: openssl pkcs12 -nocerts –in &lt;pfx-file> –out &lt;any-pem-extn-key-filename> -passin pass:&lt;pfx-password> -passout pass:&lt;pem-password>
    • Extracted cert as: openssl pkcs12 -clcerts -nokeys –in &lt;pfx-file> –out &lt;any-pem-extn-cert-filename> -passin pass:&lt;pfx-password>
  • Convert PEM key and certificate to DER
    • openssl pkcs8 -topk8 -nocrypt –in &lt;pem-extn-key-file> -inform PEM –out &lt;any-der-extn-key-filename> -outform DER -passin pass:&lt;pem-password>
    • openssl x509 –in &lt;pem-extn-cert-file> -inform PEM –out &lt;any-der-extn-cert-filename> -outform DER
  • Use DER key and certificate to make jks keystore
    • You have to create an instance of keystore, load the bytes from der files and call setKeyEntry and setCertificate method. There is piece of code available at agentbob named as ImportKey.java. Use this program (modify keystore file name, alias, key store password you may want to set)
Though for SSL establishment, we require just server certificate to be verified with client’s copy when the initial establishment happens. If the client trusts the copy of the SSL certificate server has sent during SSL establishment, client sends a message, in turn server sends a digitally signed acknowledgement to start SSL encrypted session. You may send client certificate, but it’s on the SSL server side, whether it is configured to accept/ignore/require client certificates.
Step 3: Write some Java client code to establish SSL socket connection
We require to set SSLSocketFactory and HostVerifier on HttpsURLConnection
HttpsURLConnection.setDefaultHostnameVerifier(getHostVerifier());
HttpsURLConnection.setDefaultSSLSocketFactory(getSocketFactory());
HttpsURLConnection httpsConnection = (HttpsURLConnection)url.openConnection();
httpsConnection.connect();
InputStreamReader content = new InputStreamReader(httpsConnection.getInputStream());
for (int i=0;i != -1;i = content.read())
{
    System.out.print((char) i);
}




Host Verifier is an optional code component, which helps you to verify whether your session peer is same you are assuming to connect, can be written as,


private HostnameVerifier getHostVerifier(){
HostnameVerifier hostnameVerifier = new HostnameVerifier() 
{ 
    public boolean verify(String urlHostName, SSLSession session)
    { 
        System.out.println("Warning: URL Host: " + urlHostName + " v/s " + session.getPeerHost());
        return true;
        }
};    
return hostnameVerifier;
}




Most importantly, we require to set SSL socket factory with right keystore and truststore, we can use TLS instance too, you can get name of SSL/TLS algorithms here


private SSLSocketFactory getSocketFactory() throws ...{
    SSLContext sctx = SSLContext.getInstance("SSLv3");
    SecureRandom secureRandom = new SecureRandom();
    sctx.init(getKeyManagers(), getTrustManagers(), secureRandom);
    return sctx.getSocketFactory();
}




Key Managers and Trust Managers can be created as,

private KeyManager[] getKeyManagers() throws ...{
    KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
    kmf.init(keystore, password.toCharArray());
    return kmf.getKeyManagers();
}

private TrustManager[] getTrustManagers() throws ...{
    TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
    tmf.init(keystore);
    return tmf.getTrustManagers();
}




Finally the keystore instance used above in the key manager and trust manager, you have to load the server certificate keystore.

File file = new File("servercert.jks");
FileInputStream is = new FileInputStream(file); 
keystore = KeyStore.getInstance("jks"); 
keystore.load(is, keystorePassword.toCharArray());    





Once I had this piece of code knitted together, I received following error,
javax.net.ssl.SSLException: HelloRequest followed by an unexpected  handshake message. I learned we required to set a system property,

System.setProperty("sun.security.ssl.allowUnsafeRenegotiation", "true");




Once this has been done, I was able to make https call to my IIS hosted site as,


maker.makeHttpsCall(new URL(https://127.0.0.1/mysite));




I hope this will help you establish SSL connection from Java code.

Monday, May 16, 2011

Tuning Cassandra cluster configuration and improving performance

If you are considering to migrate to NO-SQL solution and want to use some data store which is open-source, massively scalable, fault tolerant, peer to peer and column based, there is a high probability that you will choose Apache Cassandra. According to my understanding, Apache Cassandra edges over Apache HBase when the data node topology required is something which is more robust and peer to peer. But again, HBase provides a tight integration with Hadoop ecosystem, which in a way or other can also be achieved by Apache Cassandra.
Anyways, yaml is always available to list down the configurable parameters but just to have a checklist, couple of important factors which should be take care when configuring Cassandra cluster for production environment (cloud/non-cloud based):

Q: Do we need high availability?
A: Yes, then we require to have multiple nodes across multiple data centres. We require to configure below:

  • Endpoint Snitch: You may want to specify which cassandra nodes are in the same data centre/same rack, write in ip address, data centres to be there in cluster. We can specify octet based (RackInferring), explicit property file mentioning node-DC mapping (PropertySnitch), or even EC2 region based(EC2Snitch)
  • Replication Factor: the copies of the data we want in our cluster, less or equals to number of nodes. 1 means no copies (just original data).
  • Partitioner: controls how data is distributed across nodes. If you want to facilitate range slice queries or want to provide indexing using Lucene/Solr (over Cassandra) then you will require OrderPreserving. There is a probability of Hot Spot in your cluster when using OrderPreserving. RandomPartitioning takes care of this hot spot problem as it performs a MD5 hash over "data key" get a "location key" which spans over a range of 0..2^127.
  • Seeds: You would like to feed in ip address of other nodes in the cluster.
  • ReplicaPlacement: You may want to control where data replicas are placed (something to add up to HA), OldNetworkTopology/RackAware places first one in other DC and others (if required) in other racks of the same DC. NetworkTopology places replicas (if required by ReplicationFactor) on locations each DC and Rack inside it.
Q: Do you want some new nodes to be added dynamically to the existing cluster?
A: Yes, then you will require to set Autobootstrap to true. For consistent hashing, Initial token must evenly divide the keyspace. Specifying Initial_Token you can dictate the token responsibility of the new node in the cluster. You can calculate the initial token for kth node in a n node cluster by,
(k-1)*((2^127)/n)

Q: How to ensure consistency across distributed cassandra nodes?
A: Defined at data access layer (hector/thrift/avro), Consistency Level plays an important role in determining whether the overall cassandra cluster may serve old data or not (if replication has not happened yet). Consistency Level ANY, ONE for write and ONE for read gives better performance but at the cost of eventually consistent data. A higher data consistency can be achieved by QUORUM ((N/2)+1), LOCAL_QUORUM (local DC's (RF/2)+1), EACH_QUORUM (each DC's (RF/2)+1), and finally ALL (which blocks till read/writer is served by all).
Just to sum up, if you want strongly consistent data (consistency level for write+consistency level for read >Replication factor. QUORUM provides consistent data and availability of data in case of some node failure. If performance/latency is more important than you can lower the values of consistency level for either READ or WRITE or both.

Q: How to achieve performance SLAs?
A: Read performances can be enhanced by tuning couple of parameters:

  • ColumFamily enables you to have Key and/or Row caching on them. You can specify the number of key/rows, percentage or fraction over the whole data you want to store in cache. Key cache is lighter as they data stored in them is just keys. By default, Cassandra caches 200, 000 keys per CF. Key cache decreases a I/O to index file to figure out the row location of the data corresponding to that key. It's very productive to have key cache. Row Cache holds the entire content of a row in cache, by default it is off. The overhead of enabling row cache or over increasing it is that you may require more JVM heap of Cassandra, may adverse the performance. Also, if data's column size is small, it's good so that we can increase the size of row cache without worrying too much about memory consumption, but if columns are larger than increasing row cache size is tricky. The best way to fine tune it is to watch out "RowCacheHitRatio" exposed per CF on JMX over a sample data run and according to the utilization configure the optimal size. If "RowCacheHitRatio" is too low &lt;20-30 (which will be in case writes are much higher than read or extremely random reads, anyhow will trigger high GC activities), then I will suggest it doesn't make sense to enable row caching. You can use nodetool cfstats to see the performance/hit raio of key/row cache. It takes time to get a good idea about the hit rate because hit rate gradually matures.
  • Read performances can also be enhanced by fine tuning the Concurrent Reads, usually the rule is 4 threads per CPU core in the cluster. The higher the values the number of threads spanned for read, if the machines have faster I/O than the usual commodity versions then we can even increase this number a bit. Increasing it too much will in turn cause high context switches and will not yield higher performance gains.
  • Try to use faster RPC client instead of wrapper over wrapped RPC layer.
  • Read performance also get seriously degraded by contention during SSTable compaction, "NoofLiveSSTables" exposed on JMX (I guess so) gives you a typical indication about how many SSTables are getting used for a CF, or else we can use cfstats sstable count to get that. If this number is quite high or increasing, than your read operation may have contention with sstable compaction. Also, the more SSTables, the more fragmented (internal and external both) your data is. Increasing the configurations like memtable_flush_after_mins (or memtable_operations_in_millons or memtable_throughtput_in_mb {total ram/(1048576*16)}) will decrease memtable flushes which decrease number of SSTables and eventually number of them be compacted. Hence, less conflicts with read operations, which will improve read performance. The larger the value of these variables, the more heap memory you require to bubble up this in memory data structure. Even, the thread priority of the compaction thread inside can be controlled which eventually decreases number of compactions, this is a JVM argument to cassandra.

Write performance can be also be tuned, but do we really require to increase the lightning fast write behaviour cassandra provides? Anyway, following are the some of the ways to achieve it,

  • We can use bulkloading API's (StorageProxy, mutation or Binary verb) to write to Cassandra instead of using one of the wrappers.
  • Decreasing the Write Consistency level will also help as less number of nodes required to be blocked for the write operation.
  • It's preferable to use separate disk drive to write commit log. I guess this avoids I/O contention of Commit Log writes with SSTable reads.
  • You can tune Concurrent_Write to increase it from defaulted 32. Concurrent Writers should be set a bit higher than Concurrent Reads, ranging between 1x to 1.5x (x=number of Concurrent Reads). You can set it around 12x of cores.
Swap memory may throw important challenges to achieve effective read/write performances in a java application. Operating System swaps pages to and from the disk even when sufficient memory available. During swap space read/write you may observe some difference in performance. You can disable the swap with swapoff command. On linux based OS, you can append vm.swappiness=0 or 5 to file /etc/sysctl.conf to reduce likelihood of OS to use swap space. Cassandra provides a memory_locking_policy parameter in yaml, you require to enable it.  JNA libraries helps you to lock JVM memory making it unevicatable to be swapped.
Performance of Memory Mapped I/O is better than regular I/O, on a 64 bit architecture you can use this performance efficient I/O by setting disk_access_mode  in yaml to mmap.

Q: Do I require to tune JVM for Cassandra cluster setup?
A: Yes, you may want to customize it. Make MAX and MIN to be same (just to avoid full GC during heap growth/heap shrink) It's expensive to grow the heap, just MIN to what you assume should be the maximum cassandra will consume, make MAX a bit higher than that for "just in case" cassandra is hit with more load. As in most cases, its better to do a full GC and grow the heap rather than get an OutOfMemory and crash. Heap size to be allocated to Cassandra, can be calculated to be approximately by, Memtable_throughtput_in_mb*3*(number of hot column families) + I GB + key cache + row cache.Cassandra default to many GC configurations which are perfectly configured for the big data entities. Just to for example, -XX:SurvivorRatio (defaulted to 8), try to retain as many objects in survivor spaces so that they can be reclaimed by GC, if we increase the size of survivor space then copying these long lived objects between survivors will trigger minor GC. Interesting balance, but better to copy more between survivors than to promote more to Tenured. Tenuring distribution can be seen using -XX:+PrintTenuringDistribution.

We have -XX:+UseParNewGC, parallel GC has ergonomics, like tuning young generation, it avoids/decrease frequency of major GCs, it avoids full GC by avoiding/decreasing promotions, maximizes the heap size. Also, we use CMS (ConcurrentMarkSweepGC), this is for tuning old generation, CMS is designed to avoid stop-the-world pauses. Number of concurrent threads in CMS can be controlled by -XX:+ParallelCMSThreads. Pauses dones during remark phases can be reduced by enabling -XX:+CMSParallelRemarkEnabled. Just to put a point if you want to use, by default classes will not be unloaded from the permanent generation when using CMS, this can be enabled by using -XX:+CMSClassUnloadingEnabled.

On 64 bit machines, on Java SE 1.6 update 14, you can utilize the compressed pointers which results in smaller heap sizes, you have to append -XX:+UseCompressedOops to enable this.

Q: Any hardware changes to boost the performance?
A: Yes sure.
  • You can utilize 10 Gigabit ethernet to increase network throughput.
  • You can increase RAM, RAM can be allocated to Cassandra JVM heap, eventually passes advantage to lot many factors.
  • Multi Channel Memory Architecture (MCMA) technology increases the transfer rate between RAM and Memory Controllers by adding channels, theoretically transfer rate gets multiplied by the number of channels. Modern high end chip sets like, i7-9x series or latest of Xeon chip sets supports triple channel memory architecture. Even quad or eight channel memory chip sets are also present. You can optimise the RAM according to the multi memory channel, you can pair DIMMs as the number of memory channels, for example, if using triple channel memory chip sets, let each server to have triplets of DIMMs. You have to cross check the MCMA support of your server's Processor as well as Motherboard, once you know this, you can pair up your DIMMs according to that supported number to extract the best performance from the RAM.
  • On master or server you typically add more disk as RAID, whereas just to avoid administrative overhead, pipelined writes, you must add more disk to slaves as JBOD (typically 0.8*core number of disk to get optimal performance, but must not be greater than 1*core number of disk).
  • You can use fast seeking disks, Solid State Drives (SSD), SCSI systems are also better in performance. Typically, SATA drivers are recommended over SAS drives.
  • If you are setting filesystem as ext3 (you can disable the noatime, so that administrative overhead can be reduced) Changes require to done in /etc/fstab and do a mount -o remount /. If you have option to use other file system, you can use faster filesystems like ext4 and XFS, which also supports bigger file sizes and bigger volumes.
  • You can club multiple network cards to achieve high performance.