博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
twitter storm源码走读(二)
阅读量:6439 次
发布时间:2019-06-23

本文共 12998 字,大约阅读时间需要 43 分钟。

概要

storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配。除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色。supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下达新的工作。作为车间主任,supervisor领到的活是不用自己亲力亲为去作的,他手下有着一班的普通工人。supervisor对这些工人只会喊两句话,开工,收工。注意,讲收工的时候并不意味着worker手上的活已经干完了,只是进入休息状态而已。

topology的提交过程涉及到以下角色。

  • storm client   负责将用户创建的topology提交到nimbus
  • nimbus        通过thrift接口接收用户提交的topology
  • supervisor       根据zk接口上提示的消息下载最新的任务安排,并负责启动worker
  • worker            worker内可以运行task,这些task要么属于bolt类型,要么属于spout类型
  • executor         executor是一个个运行的线程,同一个executor内可以运行同一种类型的task,即一个线程中的task要么全部是bolt类型,要么全部是spout类型

一个worker等同于一个进程,一个executor等同于一个线程,同一个线程中能够运行一或多个tasks。在0.8.0版之前,一个task是对应于一个线程的,在0.8.0版本中引入了executor概念,变化引入之后,task与thread之间的一一对应关系就取消了,同时在zookeeper server中原本存在的tasks-subtree也消失了,有关这个变化,可以参考

 storm client

storm client需要执行下面这句指令将要提交的topology提交给storm cluster 假设jar文件名为storm-starter-0.0.1-snapshot-standalone.jar,启动程序为 storm.starter.ExclamationTopology,给这个topology起的名称为exclamationTopology.

#./storm jar $HOME/working/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamationTopology

这么短短的一句话对于storm client来说,究竟意味着什么呢? 源码面前是没有任何秘密可言的,那好打开storm client的源码文件

def jar(jarfile, klass, *args):    """Syntax: [storm jar topology-jar-path class ...]    Runs the main method of class with the specified arguments.     The storm jars and configs in ~/.storm are put on the classpath.     The process is configured so that StormSubmitter     (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)    will upload the jar at topology-jar-path when the topology is submitted.    """    exec_storm_class(        klass,        jvmtype="-client",        extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],        args=args,        jvmopts=["-Dstorm.jar=" + jarfile])
def exec_storm_class(klass, jvmtype="-server", jvmopts=[],               extrajars=[], args=[], fork=False):    global CONFFILE    all_args = [        "java", jvmtype, get_config_opts(),        "-Dstorm.home=" + STORM_DIR,         "-Djava.library.path=" + confvalue("java.library.path", extrajars),        "-Dstorm.conf.file=" + CONFFILE,        "-cp", get_classpath(extrajars),    ] + jvmopts + [klass] + list(args)    print "Running: " + " ".join(all_args)    if fork:        os.spawnvp(os.P_WAIT, "java", all_args)    else:        os.execvp("java", all_args) # replaces the current process and        never returns

exec_storm_class说白了就是要运行传进来了的WordCountTopology类中main函数,再看看main函数的实现

public static void main(String[] args) throws Exception {    TopologyBuilder builder = new TopologyBuilder();    builder.setSpout("spout", new RandomSentenceSpout(), 5);    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));    Config conf = new Config();    conf.setDebug(true);    if (args != null && args.length > 0) {      conf.setNumWorkers(3);      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());    }}

对于storm client侧来说,最主要的函数StormSubmitter露出了真面目,submitTopology才是我们真正要研究的重点。

public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {        if(!Utils.isValidConf(stormConf)) {            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");        }        stormConf = new HashMap(stormConf);        stormConf.putAll(Utils.readCommandLineOpts());        Map conf = Utils.readStormConfig();        conf.putAll(stormConf);        try {            String serConf = JSONValue.toJSONString(stormConf);            if(localNimbus!=null) {                LOG.info("Submitting topology " + name + " in local mode");                localNimbus.submitTopology(name, null, serConf, topology);            } else {                NimbusClient client = NimbusClient.getConfiguredClient(conf);                if(topologyNameExists(conf, name)) {                    throw new RuntimeException("Topology with name `"                    + name                     + "` already exists on cluster");                }                submitJar(conf);                try {                    LOG.info("Submitting topology " +  name                     + " in distributed mode with conf " + serConf);                    if(opts!=null) {                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                                        } else {                        // this is for backwards compatibility                        client.getClient().submitTopology(name, submittedJar, serConf, topology);                                                                }                } catch(InvalidTopologyException e) {                    LOG.warn("Topology submission exception", e);                    throw e;                } catch(AlreadyAliveException e) {                    LOG.warn("Topology already alive exception", e);                    throw e;                } finally {                    client.close();                }            }            LOG.info("Finished submitting topology: " +  name);        } catch(TException e) {            throw new RuntimeException(e);        }    }

submitTopology函数其实主要就干两件事,一上传jar文件到storm cluster,另一件事通知storm cluster文件已经上传完毕,你可以执行某某某topology了.

先看上传jar文件对应的函数submitJar,其调用关系如下图所示

再看第二步中的调用关系,图是我用tikz/pgf写的,生成的是pdf格式。

在上述两幅调用关系图中,处于子树位置的函数都曾在storm.thrift中声明,如果此刻已经忘记了的点话,可以翻看一下前面1.3节中有关storm.thrift的描述。client侧的这些函数都是由thrift自动生成的。

由于篇幅和时间的关系,在storm client侧submit topology的时候,非常重要的函数还有TopologyBuilder.java中的源码。

nimbus

storm client侧通过thrift接口向nimbus发送了了jar并且通过预先定义好的submitTopologyWithOpts来处理上传的topology,那么nimbus是如何一步步的进行文件接收并将其任务细化最终下达给supervisor的呢。

submitTopologyWithOpts

一切还是要从thrift说起,supervisor.clj中的service-handler具体实现了thrift定义的Nimbus接口,代码这里就不罗列了,太占篇幅。主要看其是如何实现submitTopologyWithOpts

(^void submitTopologyWithOpts        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology         ^SubmitOptions submitOptions]        (try          (assert (not-nil? submitOptions))          (validate-topology-name! storm-name)          (check-storm-active! nimbus storm-name false)          (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)                     storm-name                     (from-json serializedConf)                     topology)          (swap! (:submitted-count nimbus) inc)          (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))                storm-conf (normalize-conf                            conf                            (-> serializedConf                                from-json                                (assoc STORM-ID storm-id)                              (assoc TOPOLOGY-NAME storm-name))                            topology)                total-storm-conf (merge conf storm-conf)                topology (normalize-topology total-storm-conf topology)                topology (if (total-storm-conf TOPOLOGY-OPTIMIZE)                           (optimize-topology topology)                           topology)                storm-cluster-state (:storm-cluster-state nimbus)]            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology            (log-message "Received topology submission for " storm-name " with conf " storm-conf)            ;; lock protects against multiple topologies being submitted at once and            ;; cleanup thread killing topology in b/w assignment and starting the topology            (locking (:submit-lock nimbus)              (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)              (.setup-heartbeats! storm-cluster-state storm-id)              (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive                                              TopologyInitialStatus/ACTIVE :active}]                (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))              (mk-assignments nimbus)))          (catch Throwable e            (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")            (throw e))))

storm cluster在zookeeper server上创建的目录结构。目录结构相关的源文件是config.clj.

白话一下上面这个函数的执行逻辑,对上传的topology作必要的检测,包括名字,文件内容及格式,好比你进一家公司上班之前做的体检。这些工作都完成之后进入关键区域,是进入关键区域所以上锁,呵呵。

normalize-topology

(defn all-components [^StormTopology topology]  (apply merge {}         (for [f thrift/STORM-TOPOLOGY-FIELDS]           (.getFieldValue topology f)           )))

一旦列出所有的components,就可以读出这些component的配置信息。

mk-assignments

在这关键区域内执行的重点就是函数mk-assignments,mk-assignment有两个主要任务,第一是计算出有多少task,即有多少个spout,多少个bolt,第二就是在刚才的计算基础上通过调用zookeeper应用接口,写入assignment,以便supervisor感知到有新的任务需要认领。

先说第二点,因为逻辑简单。在mk-assignment中执行如下代码在zookeeper中设定相应的数据以便supervisor能够感知到有新的任务产生

(doseq [[topology-id assignment] new-assignments            :let [existing-assignment (get existing-assignments topology-id)                  topology-details (.getById topologies topology-id)]]      (if (= existing-assignment assignment)        (log-debug "Assignment for " topology-id " hasn't changed")        (do          (log-message "Setting new assignment for topology id " topology-id ": "                   (pr-str assignment))          (.set-assignment! storm-cluster-state topology-id assignment)          )))

调用关系如下图所示

 

而第一点涉及到的计算相对繁杂,需要一一仔细道来。其实第一点中非常重要的课题就是如何进行任务的分发,即scheduling.

也许你已经注意到目录src/clj/backtype/storm/scheduler,或者注意到storm.yaml中与scheduler相关的配置项。那么这个scheduler到底是在什么时候起作用的呢。mk-assignments会间接调用到这么一个名字看起来奇怪异常的函数。compute-new-topology->executor->node+por,也就是在这么很奇怪的函数内,scheduler被调用

_ (.schedule (:scheduler nimbus) topologies cluster)new-scheduler-assignments (.getAssignments cluster);; add more information to convert SchedulerAssignment to Assignmentnew-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]

schedule计算出来的assignments保存于Cluster.java中,这也是为什么new-scheduler-assignment要从其中读取数据的缘由所在。有了assignment,就可以计算出相应的node和port,其实就是这个任务应该交由哪个supervisor上的worker来执行。

 storm在zookeeper server上创建的目录结构如下图所示

 

有了这个目录结构,现在要解答的问题是在topology在提交的时候要写哪几个目录?assignments目录下会新创建一个新提交的topology的目录,在这个topology中需要写的数据,其数据结构是什么样子?

 

supervisor

一旦有新的assignment被写入到zookeeper中,supervisor中的回调函数mk-synchronize-supervisor立马被唤醒执行

主要执行逻辑就是读入zookeeper server中新的assignments全集与已经运行与本机上的assignments作比较,区别出哪些是新增的。在sync-processes函数中将运行具体task的worker拉起。

 要想讲清楚topology提交过程中,supervisor需要做哪些动作,最主要的是去理解下面两个函数的处理逻辑。

  • mk-synchronize-supervisor  当在zookeeper server的assignments子目录内容有所变化时,supervisor收到相应的notification, 处理这个notification的回调函数即为mk-synchronize-supervisor,mk-sychronize-supervisor读取所有的assignments即便它不是由自己处理,并将所有assignment的具体信息读出。尔后判断分析出哪些assignment是分配给自己处理的,在这些分配的assignment中,哪些是新增的。知道了新增的assignment之后,从nimbus的相应目录下载jar文件,用户自己的处理逻辑代码并没有上传到zookeeper server而是在nimbus所在的机器硬盘上。
  • sync-processes mk-synchronize-supervisor预处理过完与assignment相关的操作后,将真正启动worker的动作交给event-manager, event-manager运行在另一个独立的线程中,这个线程中进行处理的一个主要函数即sync-processes. sync-processes会将当前运行着的worker全部kill,然后指定新的运行参数,重新拉起worker.
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]  (fn this []    (let [conf (:conf supervisor)          storm-cluster-state (:storm-cluster-state supervisor)          ^ISupervisor isupervisor (:isupervisor supervisor)          ^LocalState local-state (:local-state supervisor)          sync-callback (fn [& ignored] (.add event-manager this))          assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)          storm-code-map (read-storm-code-locations assignments-snapshot)          downloaded-storm-ids (set (read-downloaded-storm-ids conf))          ;;read assignments from zookeeper          all-assignment (read-assignments                           assignments-snapshot                           (:assignment-id supervisor))          new-assignment (->> all-assignment                              (filter-key #(.confirmAssigned isupervisor %)))          ;;task在assignment中          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]      (log-debug "Synchronizing supervisor")      (log-debug "Storm code map: " storm-code-map)      (log-debug "Downloaded storm ids: " downloaded-storm-ids)      (log-debug "All assignment: " all-assignment)      (log-debug "New assignment: " new-assignment)            ;; download code first      ;; This might take awhile      ;;   - should this be done separately from usual monitoring?      ;; should we only download when topology is assigned to this supervisor?      (doseq [[storm-id master-code-dir] storm-code-map]        (when (and (not (downloaded-storm-ids storm-id))                   (assigned-storm-ids storm-id))          (log-message "Downloading code for storm id "             storm-id             " from "             master-code-

转载地址:http://bnuwo.baihongyu.com/

你可能感兴趣的文章
磁盘镜像工具Guymager
查看>>
排序算法(一)——冒泡排序及改进
查看>>
Ext江湖
查看>>
一起谈.NET技术,实战ASP.NET大规模网站架构:Web加速器
查看>>
RHEL 6.6下Python 2.6.6升级到Python 3.6.6
查看>>
linux 内核启动过程以及挂载android 根文件系统的过程 ( 转)
查看>>
shell每日更新(7)
查看>>
单词的个数
查看>>
从程序员到项目经理(27):怎样给领导汇报工作
查看>>
eclipse工程 'cocostudio/CocoStudio.h' file not found
查看>>
045医疗项目-模块四:采购单模块—采购单提交(Dao,Service,Action三层)
查看>>
dockerfile创建php容器(安装memcached、redis、gd、xdebug扩展)
查看>>
转:面对JXTA,我迷茫了
查看>>
IT人必须学会的职场四原则
查看>>
Android之剪贴薄实现
查看>>
Sonix SN9P701 OCR点读笔二维码识别源码
查看>>
oracle 单引号 双引号 连接符
查看>>
如何使用fileupload工具来实现文件上传
查看>>
EZ GUI Button和Checkbox创建
查看>>
指针[收藏]
查看>>