2013年5月6日 星期一

Storm入門教程 第二章 構建Topology


Storm入門教程 第二章 構建Topology


原文作者: 毅山,宋智

2 Storm Topology構建

2.1 Storm基本概念

在運行一個Storm任務之前,需要瞭解一些概念:

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers
  9. Configuration

Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology卻會永遠的運行(除非你手動kill掉)。

Storm的集群裡面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面運行一個叫Nimbus背景程式,它的作用類似Hadoop裡面的JobTrackerNimbus負責在集群裡面分發代碼,分配計算任務給機器, 並且監控狀態。

每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那台機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。

 


NimbusSupervisor之間的所有協調工作都是通過Zookeeper集群完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要麼在zookeeper裡面, 要麼在本地磁片上。這也就意味著你可以用kill -9來殺死NimbusSupervisor進程, 然後再重啟它們,就好像什麼都沒有發生過。這個設計使得Storm異常的穩定。

2.1.1 Topologies

一個topologyspoutsbolts組成的圖, 通過stream groupings將圖中的spoutsbolts連接起來,如下圖:

一個topology會一直運行直到你手動kill掉,Storm自動重新分配執行失敗的任務, 並且Storm可以保證你不會有資料丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。

運行一個topology很簡單。首先,把你所有的代碼以及所依賴的jar檔都包進一個jar包。然後運行類似下面的這個命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

這個命令會運行: backtype.strom.MyTopology類別, 參數是arg1, arg2。這個類別的main函式定義這個topology並且把它提交給Nimbusstorm jar負責連接到Nimbus並且上傳jar包。

Topology的定義是一個Thrift結構,並且Nimbus就是一個Thrift服務, 你可以提交由任何語言創建的topology。上面的方面是使用JVM-based語言提交的最簡單的方法。

2.1.2 Streams

消息流streamstorm裡的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分散式的方式並行地被創建和處理。通過對streamtuple序列中每個欄位命名來定義stream。在預設的情況下,tuple的欄位類型可以是:integerlongshort bytestringdoublefloatbooleanbyte array。你也可以自訂類型(只要實現相應的序列化器)。

每個消息流在定義的時候會被分配給一個id (MessageId),因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為'default'默認的id

Storm提供的最基本的處理stream的原語是spoutbolt。你可以實現spoutbolt提供的介面來處理你的業務邏輯。

2.1.3 Spouts

訊息源spoutStorm裡面一個topology裡面的消息生產者。一般來說訊息源會從一個外部源讀取資料並且向topology裡面發出消息:tupleSpout可以是可靠的也可以是不可靠的。如果這個tuple沒有被storm成功處理,可靠的訊息源spouts可以重新發射一個tuple 但是不可靠的訊息源spouts一旦發出一個tuple就不能重發了。

訊息源可以發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,然後使用SpoutOutputCollector來發射指定的stream

Spout類裡面最重要的方法是nextTuple。要麼發射一個新的tupletopology裡面或者簡單的返回如果已經沒有新的tuple。要注意的是nextTuple方法不能被阻塞blocking,因為storm在同一個執行緒上面調用所有訊息源spout的方法。

另外兩個比較重要的spout方法是ackfailstorm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用failstorm只對可靠的spout調用ackfail

2.1.4 Bolts

所有的消息處理邏輯被封裝在bolts裡面。Bolts可以做很多事情:過濾,聚合,查詢資料庫等等。

Bolts可以簡單的做消息流的傳遞。複雜的消息流處理往往需要很多步驟,從而也就需要經過很多bolts。比如算出一堆圖片裡面被轉發最多的圖片就至少需要兩步:第一步算出每個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(如果要把這個過程做得更具有擴展性那麼可能需要更多的步驟)。

Bolts可以發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream

Bolts的主要方法是execute, 它以一個tuple作為輸入,bolts使用OutputCollector來發射tuplebolts必須要為它處理的每一個tuple調用OutputCollectorack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts 一般的流程是: bolts處理一個輸入tuple, 發射0個或者多個tuple, 然後調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack

2.1.5 Stream groupings

定義一個topology的其中一步是定義每個bolt接收什麼樣的流作為輸入。stream grouping就是用來定義一個stream應該如果分配資料給bolts上面的多個tasks

Storm裡面有7種類型的stream grouping

  1. Shuffle Grouping: 隨機分組, 隨機派發stream裡面的tuple,保證每個bolt接收到的tuple數目大致相同。
  2. Fields Grouping:按欄位分組, 比如按userid來分組, 具有同樣useridtuple會被分到相同的Bolts裡的一個task 而不同的userid則會被分配到不同的bolts裡的task
  3. All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
  4. Global Grouping:全域分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task
  5. Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個執行緒裡面去執行。
  6. Direct Grouping 直接分組, 這是一種比較特別的分組方法,用這種分組意味著消息的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的taskid OutputCollector.emit方法也會返回taskid)。
  7. Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。

2.1.6 Reliability

Storm保證每個tuple會被topology完整的執行。Storm會追蹤由每個spout tuple所產生的tuple樹(一個bolt處理一個tuple之後可能會發射別的tuple從而形成樹狀結構),並且跟蹤這棵tuple樹什麼時候成功處理完。每個topology都有一個消息超時的設置,如果storm在這個超時的時間內檢測不到某個tuple樹到底有沒有執行成功, 那麼topology會把這個tuple標記為執行失敗,並且過一會兒重新發射這個tuple

為了利用Storm的可靠性特性,在你發出一個新的tuple以及你完成處理一個tuple的時候你必須要通知storm。這一切是由OutputCollector來完成的。通過emit方法來通知一個新的tuple產生了,通過ack方法通知一個tuple處理完成了。

Storm的可靠性我們在第四章會深入介紹。

2.1.7 Tasks

每一個spoutbolt會被當作很多task在整個集群裡執行。每一個executor對應到一個執行緒,在這個執行緒上運行多個task,而stream grouping則是定義怎麼從一堆task發射tuple到另外一堆task。你可以調用TopologyBuilder類的setSpoutsetBolt來設置並行度(也就是有多少個task)。

2.1.8 Workers

一個topology可能會在一個或者多個worker(工作進程)裡面執行,每個worker是一個物理JVM並且執行整個topology的一部分。比如,對於並行度是300topology來說,如果我們使用50個工作進程來執行,那麼每個工作進程會處理其中的6tasksStorm會儘量均勻的工作分配給所有的worker

2.1.9 Configuration

Storm裡面有一堆參數可以配置來調整Nimbus, Supervisor以及正在運行的topology的行為,一些配置是系統級別的,一些配置是topology級別的。default.yaml裡面有所有的預設配置。你可以通過定義個storm.yaml在你的classpath裡來覆蓋這些預設配置。並且你也可以在代碼裡面設置一些topology相關的配置資訊(使用StormSubmitter)。

2.2 構建Topology

2.2.1 實現的目標

我們將設計一個topology,來實現對一個句子裡面的單詞出現的頻率進行統計。這是一個簡單的例子,目的是讓大家對於topology快速上手,有一個初步的理解。

2.2.2 設計Topology結構

在開始開發Storm專案的第一步,就是要設計topology。確定好你的資料處理邏輯,我們今天將的這個簡單的例子,topology也非常簡單。整個topology如下:



整個topology分為三個部分:

KestrelSpout:資料來源,負責發送被斷行的字句sentence

Splitsentence:負責將字句sentence切分成英文單字

Wordcount:負責對單字的頻率count進行累加

2.2.3 設計資料流程

這個topologykestrel queue讀取句子,並把句子劃分成單詞,然後匯總每個單詞出現的次數,一個tuple負責讀取句子,每一個tuple分別對應計算每一個單詞出現的次數,大概樣子如下所示:



2.2.4 代碼實現

1) 構建maven環境:

為了開發storm topology, 你需要把storm相關的jar包添加到classpath裡面去: 要麼手動添加所有相關的jar包, 要麼使用maven來管理所有的依賴。stormjar包發佈在Clojars(一個maven), 如果你使用maven的話,把下面的配置添加在你專案的pom.xml裡面。

<repository>
  <id>clojars.org</id>
  <url>http://clojars.org/repo</url>
</repository>
<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.5.3</version>
  <scope>test</scope>
</dependency>

2) 定義topology

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout(“kestrel.backtype.com”,22133,”sentence_queue”,new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10).shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20).fieldsGrouping(2, new Fields(“word”));

這種topologyspout從句子佇列中讀取句子,在kestrel.backtype.com位於一個Kestrel的伺服器埠22133

SpoutsetSpout方法插入一個自己命名uniqueidtopology Topology中的每個Topology中的spout或bolt節點都必須給予一個自己命名uniqueid,這個id是由其他bolts用於訂閱該節點的輸出流。 在這個範例中KestrelSpouttopologyid1(建議使用比較有意義的名字來命名)

setBolt是用於在Topology中插入bolts topology中定義的第bolts 是切割句子的bolts 這個bolts 將句子stream轉成成單詞stream

讓我們看看SplitSentence實施:

public class SplitSentence implements IBasicBolt{
    public void prepare(Map conf, TopologyContext context) {
    }

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);

        for(String word: sentence.split(“ ”)) {
            collector.emit(new Values(word));
        }
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(“word”));
    }
}

關鍵的方法是 execute方法。 正如你可以看到,它將句子拆分成單詞,並發出每個單詞作為一個新的元組。 另一個重要的方法是declareOutputFields其中宣告bolts輸出tuple的資料架構。 在範例中宣佈,它發出一個Fieldsword的元組

setBolt的最後一個參數是你想為bolts的並行量。 SplitSentence bolts 10個併發,這將導致在storm集群中有十個執行緒並存執行。 你所要做的的是增加bolts的並行量在遇到topology的瓶頸時。

setBolt方法返回一個物件,用來定義bolts的輸入。 例如SplitSentence這個bolt訂閱元件“1”使用隨機分組的輸出流。 “1”是指已經定義KestrelSpout 我將解釋在某一時刻的隨機分組的一部分。 到目前為止,最要緊的是SplitSentence bolts會消費KestrelSpout發射出emit的每一個資料tuple

下面在讓我們看看wordcount的實現:

public class WordCount implements IBasicBolt {
    private Map<String, Integer> _counts = new HashMap<String, Integer>();
 
    public void prepare(Map conf, TopologyContext context) {
    }
 
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        int count;
        if(_counts.containsKey(word)) {
            count = _counts.get(word);
        } else {
            count = 0;
        }
        count++;
        _counts.put(word, count);
        collector.emit(new Values(word, count));
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(“word”, “count”));
    }
}

SplitSentence對於句子裡面的每個單詞發射一個新的tuple, WordCount在記憶體裡面維護一個[單詞]對應[次數]的mapping WordCount每收到一個單詞, 它就更新記憶體裡面的統計狀態。

2.2.5 運行Topology

storm的運行有兩種模式: 本地模式和分散式模式.

1) 本地模式:

storm用一個進程裡面的執行緒來模擬所有的spoutbolt. 本地模式對開發和測試來說比較有用。 你運行storm-starter裡面的topology的時候它們就是以本地模式運行的, 你可以看到topology裡面的每一個元件在發射emit什麼消息。

2) 分散式模式:

storm由一堆機器組成。當你提交topologymaster的時候, 你同時也把topology的代碼提交了。master負責分發你的代碼並且負責給你的topolgoy分配工作進程。如果一個工作進程掛掉了, master節點會重新分配到其它節點。

3) 下面是以本地模式運行的代碼:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(“test”, conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology(“test”);
cluster.shutdown();

首先, 這個代碼定義通過定義一個LocalCluster物件來定義一個進程內的集群。提交topology給這個虛擬的集群和提交topology給分散式集群是一樣的。通過調用submitTopology方法來提交topology 它接受三個參數:要運行的topology的名字,一個配置物件以及要運行的topology本身。

topology的名字是用來唯一區別一個topology的,之後可以用這個名字來結束kill這個topology的。前面已經說過了, 你必須自己下命令去kill一個topology 否則它會一直運行。

Conf物件可以配置很多東西, 下面兩個是最常見的:

TOPOLOGY_WORKERS(setNumWorkers) 定義你希望集群分配多少個工作進程給你來執行這個topology。 topology裡面的每個元件都會使用執行緒來執行。每個元件到底用多少個執行緒是通過setBoltsetSpout來指定的。這些執行緒都運行在工作進程裡面. 每一個工作進程包含一些節點的一些工作執行緒。比如, 如果你指定300個執行緒,60個進程, 那麼每個工作進程裡面要執行6個執行緒, 而這6個執行緒可能屬於不同的元件(Spout, Bolt)。你可以通過調整每個元件的並行度以及這些執行緒所在的進程數量來調整topology的性能。

TOPOLOGY_DEBUG(setDebug), 當它被設置成true的話, storm會記錄下每個元件所發射的每條消息。這在本地環境調試topology很有用, 但是在storm cluster上這麼做的話會影響性能的。

結論
本章從storm的基本物件的定義,到廣泛的介紹了storm的開發環境,從一個簡單的例子講解了topology的構建和定義。希望大家可以從本章的內容對storm有一個基本的理解和概念,並且已經可以構建一個簡單的topology

沒有留言: