Saturday, August 15, 2009

MTOM/XOP and performance improvements in CXF

Message Transmission Optimization Mechanism (MTOM) is a core feature of web service engine. CXF supports MTOM with XOP.

Brief description about MTOM/XOP in SOAP:

In standard SOAP messages, binary objects are base64-encoded and included in the message body. This increases their size by 33%, which for very large binary objects can significantly impact transmission time. The SOAP Message Transmission Optimization Mechanism (MTOM) and XML-binary Optimized Packaging (XOP) specifications, define a method for optimizing the transmission of large base64Binary data objects within SOAP messages:

XOP is used as the referencing mechanism in the serialised XML infoset. In theory, the abstract MTOM model could be used with a different referencing mechanism and/or different container format, over a different transport protocol instead of HTTP. In practice, MTOM is usually used with XOP, MIME and HTTP.

In MTOM, an Optimized MIME Multipart/Related Serialization of SOAP Messages is defined that summarises that the serialized XML infoset will include XML-binary Optimized Packaging (XOP) in place of the binary data, and the binary data (along with the serialized XML infoset with XOP placeholders) will be represented together in a MIME container.

Performance issue description: In CXF 2.0.7, MTOM performance was poor. In the initial set of tests I had done, For a 12 MB of data, it was taking around 1.2-1.4 seconds. Pretty Slow compared to other WS stacks MTOM performance.

Sharing approach: We intergrated CXF in Pramati Java EE 1.5 application server. I delved into this, firstly removed all in and out application server's (web container streams). removed out CXF Interceptors to finally come to the one which composes and decomposes MTOM SOAP message, to finally arrive at MimeBodyPartInputStream, PushbackInputStream.

MimeBodyPartInputStream doesn't implements read(byte[]) method, hence it delegates it to parent InputStream class. InputStream's read(byte[]) runs over a loop, eventually delegating to PushBackInputStream, which reads a single byte and performs boundary matching over that Byte.

A simple test program which takes the time taken by the MimeBodyPartInputStream to read from a loaded buffer shows that MimeBodyPartInputStream takes ~1200-1400 ms to read a 12MB buffer. On the other hand, an InputStream takes around ~100-150ms to do so. Yes, that's obvious as we all understand that MimeBodyPartInputStream has the logic of detecting a probable boundary and it performs multiple if checks to do so, also it calls read and unread over PushBackInputStream. But again considering number of the reads and unreads also, the time of MimeBodyPartInputStream looked poor.

An alternative implementation of MimeBodyPartInputStream which added dynamic buffer creation, buffer used for forward-backward index movement, and implements read(byte[] buffer, int off, int len) helped me to solve the performance problem. The functionality of reading the buffer, matching the process boundary is now performed by it.

The performance counters after the changes were (12 MB of data):

1. ~1578 ms (current CXF MBPIS)
2. ~172 ms (just read raw 12MB data)
3. ~188 ms (new CXF MBPIS)

They look more encouraging, sensible and charming :)

So next time you use MTOM feature in any CXF releases 2.0.10 or 2.1.4 after, you will get better performance.

In case you want to see the changes, they are below,

Index: trunk/rt/core/src/main/java/org/apache/cxf/attachment/MimeBodyPartInputStream.java
===================================================================
diff -u -N -r651669 -r718620
--- trunk/rt/core/src/main/java/org/apache/cxf/attachment/MimeBodyPartInputStream.java  
(.../branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/attachment/MimeBodyPartInputStream.java)  (revision 651669)
+++ trunk/rt/core/src/main/java/org/apache/cxf/attachment/MimeBodyPartInputStream.java  
(.../trunk/rt/core/src/main/java/org/apache/cxf/attachment/MimeBodyPartInputStream.java)  (revision 718620)
@@ -28,15 +28,159 @@
     PushbackInputStream inStream;
 
     boolean boundaryFound;
-
+    int pbAmount;
     byte[] boundary;
+    byte[] boundaryBuffer;
 
-    public MimeBodyPartInputStream(PushbackInputStream inStreamParam, byte[] boundaryParam) {
+    public MimeBodyPartInputStream(PushbackInputStream inStreamParam, 
+                                   byte[] boundaryParam,
+                                   int pbsize) {
         super();
         this.inStream = inStreamParam;
         this.boundary = boundaryParam;
+        this.pbAmount = pbsize;
     }
 
+    public int read(byte buf[], int origOff, int origLen) throws IOException {
+        byte b[] = buf;
+        int off = origOff;
+        int len = origLen;
+        if (boundaryFound) {
+            return -1;
+        }
+        if ((off < 0) || (off > b.length) || (len < 0) 
+            || ((off + len) > b.length) || ((off + len) < 0)) {
+
+            throw new IndexOutOfBoundsException();
+        }
+        if (len == 0) {
+            return 0;
+        }
+        boolean bufferCreated = false;
+        if (len < boundary.length * 2) {
+            //buffer is too short to detect boundaries with it.  We'll need to create a larger buffer   
+            bufferCreated = true;
+            if (boundaryBuffer == null) {
+                boundaryBuffer = new byte[boundary.length * 2];
+            }
+            b = boundaryBuffer;
+            off = 0;
+            len = boundaryBuffer.length;
+        }
+        if (len > pbAmount) {
+            len = pbAmount;  //can only pushback that much so make sure we can
+        }
+        if (len > 0) {
+            len = inStream.read(b, off, len);
+        }
+        int i = processBuffer(b, off, len);
+        if (bufferCreated && i > 0) {
+            // read more than we need, push it back
+            if (origLen >= i) {
+                System.arraycopy(b, 0, buf, origOff, i);
+            } else {
+                System.arraycopy(b, 0, buf, origOff, origLen);
+                inStream.unread(b, origLen, i - origLen);
+                i = origLen;
+            }
+        } else if (i == 0 && boundaryFound) {
+            return -1;
+        }
+        return i;
+    }
+
+    //Has Data after encountering CRLF
+    private boolean hasData(byte[] b, int initialPointer, int pointer, int off, int len)
+        throws IOException {
+        if (pointer < (off + len)) {
+            return true;
+        } else {
+            inStream.unread(b, initialPointer, (off + len) - initialPointer);
+            return false;
+        }
+    }
+
+    protected int processBuffer(byte[] buffer, int off, int len) throws IOException {
+        for (int i = off; i < (off + len); i++) {
+            boolean needUnread0d0a = false;
+            int value = buffer[i];
+            int initialI = i;
+            if (value == 13) {
+                if (!hasData(buffer, initialI, initialI + 1, off, len)) {
+                    return initialI - off;
+                }
+                value = buffer[initialI + 1];
+                if (value != 10) {
+                    continue;
+                } else {  //if it comes here then 13, 10 are values and will try to match boundaries
+                    if (!hasData(buffer, initialI, initialI + 2, off, len)) {
+                        return initialI - off;
+                    }
+                    value = buffer[initialI + 2];
+                    if ((byte) value != boundary[0]) {
+                        i++;
+                        continue;
+                    } else { //13, 10, boundaries first value matched
+                        needUnread0d0a = true;
+                        i += 2; //i after this points to boundary[0] element
+                    }
+                }
+            } else if (value != boundary[0]) {
+                continue;
+            }
+
+            int boundaryIndex = 0;
+            while ((boundaryIndex < boundary.length) && (value == boundary[boundaryIndex])) {
+                if (!hasData(buffer, initialI, i + 1, off, len)) {
+                    return initialI - off;
+                }                
+                value = buffer[++i];
+                boundaryIndex++;
+            }
+            if (boundaryIndex == boundary.length) {
+                // read the end of line character
+                if (initialI != off) {
+                    i = 1000000000;
+                }
+                if (!hasData(buffer, initialI, i + 1, off, len)) {
+                    return initialI - off;
+                }
+                boundaryFound = true;
+                int j = i + 1;
+                if (j < len && buffer[j] == 45 && value == 45) {
+                    // Last mime boundary should have a succeeding "--"
+                    // as we are on it, read the terminating CRLF
+                    i += 2;
+                    //last mime boundary
+                }
+
+                //boundary matched (may or may not be last mime boundary)
+                int processed = initialI - off;
+                if ((len - (i + 2)) > 0) {
+                    inStream.unread(buffer, i + 2, len - (i + 2));
+                }
+                return processed;
+            }
+
+            // Boundary not found. Restoring bytes skipped.
+            // write first skipped byte, push back the rest
+            if (value != -1) { //pushing back first byte of boundary
+                // Stream might have ended
+                i--;
+            }
+            if (needUnread0d0a) { //Pushing all,  returning 13
+                i = i - boundaryIndex;
+                i--; //for 10
+                value = 13;
+            } else {
+                i = i - boundaryIndex;
+                i++;
+                value = boundary[0];
+            }
+        }
+        return len;
+    }
+
     public int read() throws IOException {
         boolean needUnread0d0a = false;
         if (boundaryFound) {
@@ -77,8 +221,9 @@
         if (boundaryIndex == boundary.length) {
             // boundary found
             boundaryFound = true;
+            int dashNext = inStream.read();
             // read the end of line character
-            if (inStream.read() == 45 && value == 45) {
+            if (dashNext == 45 && value == 45) {
                 // Last mime boundary should have a succeeding "--"
                 // as we are on it, read the terminating CRLF
                 inStream.read();