2014年7月15日 星期二

使用Java memory mapped file來作為IPC (Inter-Process Communication) 的橋樑

在上一篇的文章我們驗證了Java MappedByteBuffer (透過MemoryMapped file來映射)的讀寫的throughput是非常的好, 所以延續這一篇我們就利用它來做為不同Java processes (也就是不同的JVM)如何利用它來共同分享資料。

Memory Mapped File的WIKI

所謂的memory-mapped file可視為一種虛擬記憶體 (Virtual Memory)的區段 (Segment) 映射,通常都是映射至硬碟或是SSD的非揮發性的記憶裝置上。一但映射好了之後, 就可以如同一般記憶體般地來使用。

好處在於使用這種技術可以提高I/O的效率尤其是對於比較大的檔案,至於對於小檔案來說可能就不是那麼地適合。為啥? 因為作業系統的paging size大約是4 K bytes, 如果我們只映射了5 K bytes的話¸ 那麼就有3 K bytes的paging的空間是浪費了。另外一個好處就是對於一個Size很大的檔案 (比記憶體大), 我們可以使用一小塊記憶體就可以把它映射進來處理。

當然這世上是沒有白吃的午餐的, 使用memory-mapped file有可能會因為page fault (當一段資料被載入到page cache中但是映射到virtual memory的過程還沒完成)而導致速度比正常I/O還差 (還好這種機率不高---擦汗!!!!)

另外要小心的是, 在32位元的作業系統中也不適合使用, 因為32位元的記憶體定址空間有限(大約是4GiB)。

範例程式

以下我將使用兩支Java 的程式來示範如何用memory mapped file來共享資料 (IPC – Inter Process Communication)。一支程式專門用來生產message物件 (producer)及別外一支程式用來消費message物件(consumer), 因為是範例啦, 所以我沒有作太多例外的處理, 理論上兩支程式是可以是時運行的喔!!

IMemoryRepository.java

package ew.blog.memmapipc;

/**
 * 這個interface定義了一個簡單的基於記憶體的訊息存取的方法。
 * 為了便利演繹出blog文章的主軸,在訊息(message)的結構上是固定為:
 * 1.Int - 4 bytes
 * 2.Long - 8 bytes
 * 3.Byte - 1 bytes
 * 
 * 要存放一個message時要先哷叫navigate(index)方法來移動指標到對應的位址,
 * 然後呼叫setInt, setLong及setByte來存放訊息資料。若要取回訊息資料時, 
 * 也是要先哷叫navigate(index)方法來移動指標到對應的位址, 然後再使用
 * getInt, getLong及getByte方法來取回資料
 * 
 * @author ErhWen,Kuo (erhwenkuo@gmail.com)
 *
 */
public interface IMemoryRepository {
 
 void navigate(int index);

 void setInt(int value);

 void setLong(long value);

 void setByte(byte value);

 int getInt();

 long getLong();

 byte getByte();
}

MappedByteBufferRepository.java

package ew.blog.memmapipc;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;

public class MappedByteBufferRepository implements IMemoryRepository{
    // 由於ByteBuffer是一個連續的Memroy區塊, 所以必須要先定義
    // 儲放的順序與大小以便存取
    // 在本範例中, StoreValue有:
    //              1: intValue (int: 4 bytes)
    //              2: longValue (long: 8 bytes)
    //              3: byteValue (byte: 1 bytes)
    // 我們會依序來儲放StoreValue的properties
 
    private final static int intValue_Offset = 0;
    private final static int longValue_Offset = 4;
    private final static int byteValue_Offset = 12;

    // 在本範例message物件的長度(int:4 + long:8 + byte:1 = 13)
    private final static int storageValue_Size = 13; 
 
    private ByteBuffer objectStore;

    //用來作為定位每個一物件在memory中的位置(position)
    private int pos; 
 
    public MappedByteBufferRepository(File memMappedFile, int capacity){    
        RandomAccessFile raf = null;
        FileChannel fc = null;
  
 try {
     raf = new RandomAccessFile(memMappedFile, "rw");
     fc = raf.getChannel();
     objectStore = 
              fc.map(MapMode.READ_WRITE, 0, capacity * storageValue_Size);

     objectStore.order(ByteOrder.nativeOrder());
 } catch (Exception e) {
     e.printStackTrace();
 } finally{
     //把FileChannel與RandomAccessFile物件給Close起來
     try{
  fc.close();
  raf.close();
     } catch (IOException e) {}
     }
 }
 
 public void navigate(int index) {
     pos = index * storageValue_Size;
 }

 public void setInt(int value) {
     objectStore.putInt(pos + intValue_Offset, value);
 }

 public void setLong(long value) {
     objectStore.putLong(pos + longValue_Offset, value);  
 }

 public void setByte(byte value) {
     objectStore.put(pos + byteValue_Offset, value);  
 }

 public int getInt() {
     return objectStore.getInt(pos + intValue_Offset);
 }

 public long getLong() {
     return objectStore.getLong(pos + longValue_Offset);
 }

 public byte getByte() {
     return objectStore.get(pos + byteValue_Offset);
 }

}

MemMappedMsgWriter.java

package ew.blog.memmapipc;

import java.io.File;
public class MemMappedMsgWriter {
    public static void main(String[] args) throws Exception {  
        // 取得temp的目錄路徑
        String tmpDir = System.getProperty("java.io.tmpdir"); 
 
        // 為這個memory mapped的檔案給名
        String memMappedFileName = tmpDir + "/memmappedfile.dat"; 
 
        // 產生一個標準的Java File物件
        File memMappedFile = new File(memMappedFileName);
  
        // 預計產生一千萬個Message (每個Message長度是13 bytes)
        int messageSize = 10000000; 
  
        // 初始一個Memory Mapped File的ByteBuffer來儲存資料
        IMemoryRepository memStore = 
           new MappedByteBufferRepository(memMappedFile, messageSize); 
 
        long start = System.currentTimeMillis();

        for(int i=0; i<messageSize; i++){

            // 移動指標
     memStore.navigate(i);  
     // 儲放資料
     memStore.setInt(i);
     memStore.setLong((long)i);
     memStore.setByte((byte)i);
        }  

        long end = System.currentTimeMillis();
 
        System.out.println(
           String.format("Persist %s Messages, total spends %s ms",
              messageSize, (end-start)));
    }
}

MemMappedMsgReader.java

package ew.blog.memmapipc;

import java.io.File;

public class MemMappedMsgReader {

    public static void main(String[] args) throws Exception {  
        // 取得temp的目錄路徑
 String tmpDir = System.getProperty("java.io.tmpdir"); 
 
 // 為這個memory mapped的檔案給名
 String memMappedFileName = tmpDir + "/memmappedfile.dat"; 
 
 // 產生一個標準的Java File物件
 File memMappedFile = new File(memMappedFileName);
  
 // 預計讀進一千萬個Messages (每個Message長度是13 bytes)
 int messageSize = 10000000; 
  
 // 初始一個Memory Mapped File的ByteBuffer來讀取資料
 IMemoryRepository memStore = 
           new MappedByteBufferRepository(memMappedFile, messageSize);  

 long start = System.currentTimeMillis(); 
 
 for(int i=0; i<messageSize; i++){

     // 移動指標
     memStore.navigate(i);
   
     // 讀取資 料
     int messageIntData = memStore.getInt();
     long messageLongData = memStore.getLong();
     byte messageByteDataq = memStore.getByte();
 }
  
 long end = System.currentTimeMillis();  
 System.out.println(
            String.format("Read %s Messages, total spends %s ms", 
               messageSize, (end-start)));
 }
}

結論

Memory mapped file的技術對於開發IPC (Inter-Process Communication)相關的需求來說應該是一種很不錯的選項。而且讀寫的速度相對使用Socket的方法來說是有效率的多了。

範例的源碼在這裡

2014年7月14日 星期一

那一種Java的記憶體在使用上會比較快呢?

最近在研究java的memory mapped file的技術的時候,看到了一篇網路上的文章是有關於java幾種不同記憶體技術(Heap, OffHead, ByteBuffer與DirectByteBuffer)在讀寫的throughput上的比較。

有與趣看英文的測試原文者請參考以下Blog ”Which memory is faster Heap or ByteBuffer or Direct? (http://www.javacodegeeks.com/2013/08/which-memory-is-faster-heap-or-bytebuffer-or-direct.html)”。

雖然這幾年有愈來愈多的基於JVM的程式語言被開發出來, 但是談到memory的使用與管理的時候, 回到基本上的了解卻還是必要的。在這篇文章裡我們還是驗證一下幾種memory allocation在Java下的差異與讀寫的速度。

Java 記憶體的allocation

到底Java提供那些記憶體allocation的型態呢?

Heap Memory

大家都知道Java的JVM啟動的時候需要配置”Xmx”參數, 而這個參數就定義了這個JVM最大可用的Memory pool。基本上我們在Java裡頭用”new”這個關鍵字所初始的物件 都會在這個Memory pool當中。

在這次的驗證中, 我們將使用Java的array來看看Head Memory的read/write throughtput。

Non Direct ByteBuffer

在Java中, 我們當用ByteBuffer.allocate()方法來取得一個ByteBuffer物件, 而這個ByteBuffer物件它包裝了對byte array的相關操作, 當我們處理的對象是bytes而不是物件的情形下, ByteBuffer是一個很好用的類別。

Direct ByteBuffer

Direct ByteBuffer在JDK 1.4這個版本時被release出來。在Java Doc裡頭是這樣講的:

“A direct byte buffer may be created by invoking the allocateDirect factory method of this class. The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers. The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system’s native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.”

歸納以上的說明, Direct ByteBuffer的特點有:

  • 它allocated到的memory區塊是不在之前談到的JVM head記憶體
  • 它在JVM的Garbage Collector的範圍之外
  • 它適合用在這個Buffer會活的很 久而且Buffer的size比較大的場景

MappedByteBuffer

常常我們使用記憶體技術來設計程式的時候, 如何把記憶體裡頭的資料persistent地保存下來, 一直是一個課題。而Memory mapped file (記憶體區塊的檔案映射)技術就是大家可以拿出來使用的高明招式了。

在Java中, MemoryMapped file也是利用了以上所提及的Direct ByteBuffer, 在這次的驗證中我們也來看看MappedByteBuffer的read/write throughput會是如何。

OffHeap Memory

Java為了避免像我這種初階的程式設計師對於記憶體管理的設計不當而把Server都搞掛了, 於是使用了GC來自動處理記憶體管理的複雜問題。雖然很方便, 可是也因此而讓GC成為寫high performance/high throughput的程式時的一種不容易去控制的變動因素。 雖然如此, Java還是提供一個Unsafe的類別來讓我們自己去allocate與unallocate memory(唉 !人就是這樣, 你愈告訴我那個地方很危險, 我就是愈想去玩!!)。

讓我們深入地去看看這幾種記憶體allocation的read/write throughput吧!

在這次的驗證中, 我們將在記憶體read/write 一種13個byte的message (int -4 bytes, long – 8 bytes 及byte – 1 byte)。而且這次的驗證中, 我們不去評測memory allocation/deallocation的速度而是將專注在讀寫的效率之上。

循序寫(Sequential Write)的效率

X軸: Test Round

Y軸: 每秒可以完成多少次寫入的動作(百萬次為一單元) -- op/second (in Millions)

  1. DirectByteBuffer, MappedByteBuffer與OffHeap的效率大致差異不多, 大約落在280 Million/Second
  2. Heap的表現中規中矩 160 Million/Second
  3. ByteBuffer的表現則最不好, 大約只有50 Million/Second
  4. DirectByteBuffer, MappedByteBuffer與OffHeap的效率比Heap快80%, 比ByteBuffer快了460%

循序讀(Sequential Read)的效率

X軸: Test Round

Y軸: 每秒可以完成多少次讀出的動作(百萬次為一單元) -- op/second (in Millions)

  1. DirectByteBuffer與MappedByteBuffer效率大致接近, 大約落在18000 Million/Second
  2. OffHead的效率只有10000 Million/Second, 比DirectByteBuffer或MappedByteBuffer慢了80% (怎麼會這樣呢? 我百思不得其解)。
  3. Heap的表現很差, 平均只有900 Million/Second
  4. ByteBuffer的表現則慘不忍睹地只有120 Million/Second

隨機讀(Random Read)的效率

X軸: Test Round

Y軸: 每秒可以完成多少次讀出的動作(百萬次為一單元) -- op/second (in Millions)

  1. DirectByteBuffer, MappedByteBuffer, ByteBuffer與OffHeap效率大致接近, 大約落在3000 Million/Second
  2. Heap的表現跟循序讀取的效能差不多, 平均只有900 Million/Second

結論

  • DirectByteBuffer與 MappedByteBuffer在讀寫的表現都令人讚賞, 尤其是循序讀的效能突出令人驚豔
  • Heap (Java Array)的效能穩定, 但沒有特別的看頭
  • OffHeap在循序讀取的效能上有點令我跌破眼鏡, 照理來說不應有如此之現象(我強烈懷疑something wrong!? 如果你有任何發現請告訴本草民)
  • ByteBuffer的效能也很怪, 難怪在很多的開放源碼裡頭都使用DirectByteBuffer (不僅快, 而且還可以Zero-copy來減少因Buffer於Socket與Java Heap中間的copy過來與copy過去而讓效能下降的問題)

測試源始碼在這裡

測試後的分析資料在這裡

2013年5月6日 星期一

Storm入門教程 第五章 一致性事務


Storm入門教程 第五章 一致性事務


原文作者: 木晗

5 Storm消息一致性事務

Storm是一個分散式的stream處理系統,利用anchorack機制保證所有tuple都被成功處理。如果tuple出錯,則訊息可以被重傳,但是如何保證出錯的tuple只被處理一次呢?Storm提供了一套事務性元件Transaction Topology,用來解決這個問題。
Transactional Topology目前已經不再維護,由Trident來實現事務性topology,但是原理相同。
5.1 一致性事務的設計
Storm如何實現對tuple平行處理,而又能保證事務性transactional。本節從簡單的事務性實現方法入手,逐步引出Transactional Topology的原理。
5.1.1 簡單設計一:強順序流
保證tuple只被處理一次,最簡單的方法就是將tuple流變成強順序的,並且每次只處理一個tuple。從1開始,給每個tuple都順序加上一個id。在處理tuple的時候,將處理成功的tuple id和計算結果存在資料庫中。下一個tuple到來的時候,將其id與資料庫中的id做比較。如果相同,則說明這個tuple已經被成功處理過了,忽略它;如果不同,根據強順序性,說明這個tuple沒有被處理過,將它的id及計算結果更新到資料庫中。
以統計消息總數為例。每來一個tuple,如果資料庫中存儲的id 與當前tuple id不同,則資料庫中的消息總數加1,同時更新資料庫中的當前tuple id值。如圖:

但是這種機制使得系統一次只能處理一個tuple,無法實現分散式運算。
5.1.2 簡單設計二:強順序batch
為了實現分散式,我們可以每次處理一批tuple,稱為一個batch。一個batch中的tuple可以被平行處理。
我們要保證一個batch只被處理一次,機制和上一節類似。只不過資料庫中存儲的是batch idbatch的中間計算結果先存在區域變數中,當一個batch中的所有tuple都被處理完之後,判斷batch id,如果跟資料庫中的id不同,則將中間計算結果更新到資料庫中。
如何確保一個batch裡面的所有tuple都被處理完了呢?可以利用Storm提供的CoordinateBolt。如圖:

但是強順序batch流也有局限,每次只能處理一個batchbatch之間無法並行。要想實現真正的分散式交易處理,可以使用storm提供的Transactional Topology。在此之前,我們先詳細介紹一下CoordinateBolt的原理。
5.1.3 CoordinateBolt原理
CoordinateBolt具體原理如下:
  • 真正執行計算的bolt外面封裝了一個CoordinateBolt。真正執行任務的bolt我們稱為real bolt
  • 每個CoordinateBolt記錄兩個值:有哪些task給我發送了tuple(根據topologygrouping資訊);我要給哪些tuple發送資訊(同樣根據groping資訊)
  • Real bolt發出一個tuple後,其外層的CoordinateBolt會記錄下這個tuple發送給哪個task了。
  • 等所有的tuple都發送完了之後,CoordinateBolt通過另外一個特殊的streamemitDirect的方式告訴所有它發送過tupletask,它發送了多少tuple給這個task。下游task會將這個數字和自己已經接收到的tuple數量做對比,如果相等,則說明處理完了所有的tuple
  • 下游CoordinateBolt會重複上面的步驟,通知其下游。

整個過程如圖所示:


CoordinateBolt主要用於兩個場景:
  • DRPC
  • Transactional Topology
CoordinatedBolt對於業務是有侵入的,要使用CoordinatedBolt提供的功能,你必須要保證你的每個bolt發送的每個tuple的第一個fieldrequest-id 所謂的我已經處理完我的上游的意思是說當前這個bolt對於當前這個request-id所需要做的工作做完了。這個request-idDRPC裡面代表一個DRPC請求;在Transactional Topology裡面代表一個batch
5.1.4 Trasactional Topology
Storm提供的Transactional Topologybatch計算分為processcommit兩個階段。Process階段可以同時處理多個batch,不用保證順序性;commit階段保證batch的強順序性,並且一次只能處理一個batch,第1batch成功提交之前,第2batch不能被提交。
還是以統計消息總數為例,以下代碼來自storm-starter裡面的TransactionalGlobalCount

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(“word“), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(“global-count“, “spout“, spout, 3);
builder.setBolt(“partial-count“, new BatchCount(), 5).noneGrouping(“spout“);
builder.setBolt(“sum“, new UpdateGlobalCount()).globalGrouping(“partial-count“); 
 
TransactionalTopologyBuilder共接收四個參數。
  • 這個Transactional TopologyidId用來在Zookeeper中保存當前topology的進度,如果這個topology重啟,可以繼續之前的進度執行。
  • Spout在這個topology中的id
  • 一個TransactionalSpout。一個Trasactional Topology中只能有一個TrasactionalSpout.在本例中是一個MemoryTransactionalSpout,從一個記憶體變數(DATA)中讀取資料。
  • TransactionalSpout的並行度(可選)。
下面是BatchCount的定義:

public static class BatchCount extends BaseBatchBolt {
    Object _id;
    BatchOutputCollector _collector;
    int _count = 0;
 
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }

    @Override
    public void execute(Tuple tuple) {
        _count++;
    }

    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(“id“, “count“));
    }
}

BatchCountprepare方法的最後一個參數是batch id,在Transactional Tolpoloyg裡面這id是一個TransactionAttempt物件。
Transactional Topology裡發送的tuple都必須以TransactionAttempt作為第一個fieldstorm根據這個field來判斷tuple屬於哪一個batch
TransactionAttempt包含兩個值:一個transaction id,一個attempt idtransaction id的作用就是我們上面介紹的對於每個batch中的tuple是唯一的,而且不管這個batch replay多少次都是一樣的。attempt id是對於每個batch唯一的一個id 但是對於同一個batch,它replay之後的attempt idreplay之前就不一樣了, 我們可以把attempt id理解成replay-times storm利用這個id來區別一個batch發射的tuple的不同版本。
execute方法會為batch裡面的每個tuple執行一次,你應該把這個batch裡面的計算狀態保持在一個本地變數裡面。對於這個例子來說, 它在execute方法裡面遞增tuple的個數。
最後, 當這個bolt接收到某個batch的所有的tuple之後, finishBatch方法會被調用。這個例子裡面的BatchCount類會在這個時候發射它的局部數量到它的輸出流裡面去。
下面是UpdateGlobalCount類的定義:

public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
    TransactionAttempt _attempt;
    BatchOutputCollector _collector;
    int _sum = 0;

    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
        _collector = collector;
        _attempt = attempt;
    }

    @Override
    public void execute(Tuple tuple) {
        _sum+=tuple.getInteger(1);
    }

    @Override
    public void finishBatch() {
        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
        Value newval;
        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
            newval = new Value();
            newval.txid = _attempt.getTransactionId();
            if(val==null) {
                newval.count = _sum;
            } else {
                newval.count = _sum + val.count;
            }
            DATABASE.put(GLOBAL_COUNT_KEY, newval);
        } else {
            newval = val;
        }
        _collector.emit(new Values(_attempt, newval.count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(“id“, “sum“));
    }
}

UpdateGlobalCount實現了ICommitter介面,所以storm只會在commit階段執行finishBatch方法。而execute方法可以在任何階段完成。
UpdateGlobalCountfinishBatch方法中,將當前的transaction id與資料庫中存儲的id做比較。如果相同,則忽略這個batch;如果不同,則把這個batch的計算結果加到總結果中,並更新資料庫。
Transactional Topology運行示意圖如下:



下面總結一下Transactional Topology的一些特性
  • Transactional Topology將事務性機制都封裝好了,其內部使用CoordinateBolt來保證一個batch中的tuple被處理完。
  • TransactionalSpout只能有一個,它將所有tuple分為一個一個的batch,而且保證同一個batchtransaction id始終一樣。
  • BatchBolt處理batch在一起的tuples。對於每一個tuple調用execute方法,而在整個batch處理完成的時候調用finishBatch方法。
  • 如果BatchBolt被標記成Committer,則只能在commit階段調用finishBolt方法。一個batchcommit階段由storm保證只在前一個batch成功提交之後才會執行。並且它會重試直到topology裡面的所有boltcommit完成提交。
  • Transactional Topology隱藏了anchor/ack框架,它提供一個不同的機制來fail一個batch,從而使得這個batchreplay
5.2 Trident介紹
TridentStorm之上的高級抽象,提供了joinsgroupingaggregationsfunctionsfilters等介面。如果你使用過PigCascading,對這些介面就不會陌生。
Tridentstream中的tuples分成batches進行處理,API封裝了對這些batches的處理過程,保證tuple只被處理一次。處理batches中間結果存儲在TridentState物件中。
Trident事務性原理這裡不詳細介紹,有興趣的讀者請自行查閱資料。
參考: