(转)理解storm 进程内消息流(很好的一篇文章)
2014-03-08 21:19
441 查看
from:http://zhangzhenjj.iteye.com/blog/1937861?utm_source=tuicool
JUN 21ST, 2013
Table of Contents
Internal messaging within Storm worker processes
Illustration
Detailed description
Worker processes
Executors
Where to go from here
How to configure Storm’s internal message buffers
How to configure Storm’s parallelism
Understand what’s going on in your Storm topology
Advice on performance tuning
When you are optimizing the performance of your Storm topologies it helps to understand how Storm’s internal message queues are configured and put to use. In this short article I will explain and illustrate how Storm version 0.8/0.9 implements the intra-worker
communication that happens within a worker process and its associated executor threads.
Terminology: I will use the terms message and (Storm) tuple interchangeably
in the following sections.
When I say “internal messaging” I mean the messaging that happens within a worker process in Storm, which is communication that is restricted to happen within the same Storm machine/node. For this communication Storm relies on various message queues backed
by LMAX Disruptor, which is a high performance inter-thread messaging library.
Note that this communication within the threads of a worker process is different from Storm’sinter-worker communication,
which normally happens across machines and thus over the network. For the latter Storm uses ZeroMQ by default (in Storm 0.9 there is experimental support for Nettyas
the network messaging backend). That is, ZeroMQ/Netty are used when a task in one worker process wants to send data to a task that runs in a worker process on different machine in the Storm cluster.
So for your reference:
Intra-worker communication in Storm (inter-thread on the same Storm node): LMAX Disruptor
Inter-worker communication (node-to-node across the network): ZeroMQ or Netty
Inter-topology communication: nothing built into Storm, you must take care of this yourself with e.g. a messaging system such as Kafka/RabbitMQ, a database, etc.
If you do not know what the differences are between Storm’s worker processes, executor threads and tasks please take a look at Understanding
the Parallelism of a Storm Topology.
Let us start with a picture before we discuss the nitty-gritty details in the next section.
Figure 1: Overview of a worker’s internal message queues in Storm. Queues related to a worker process are colored in red, queues related to the worker’s various executor threads are colored in green. For readability reasons I show only
one worker process (though normally a single Storm node runs multiple such processes) and only one executor thread within that worker process (of which, again, there are usually many per worker process).
Now that you got a first glimpse of Storm’s intra-worker messaging setup we can discuss the details.
The parameter
executor threads. Similarly, each worker has a single send thread that is responsible for reading messages from the worker’s transfer queue and sending them over the network to downstream consumers. The size of the transfer queue is configured via
The
receive thread (which reads the messages from the network) Setting this parameter too high may cause a lot of problems (“heartbeat thread gets starved, throughput plummets”). The default value is 8 elements, and the value must be a power of 2 (this requirement
comes indirectly from LMAX Disruptor).
Note that topology.receiver.buffer.size is in contrast to the other buffer size related parameters described in this article actually not configuring the size of an LMAX Disruptor queue. Rather it sets the size of a simple ArrayList that
is used to buffer incoming messages because in this specific case the data structure does not need to be shared with other threads, i.e. it is local to the worker’s receive thread. But because the content of this buffer is used to fill a Disruptor-backed queue
(executor incoming queues) it must still be a power of 2. See launch-receive-thread! in backtype.storm.messaging.loader for
details.
Each element of the transfer queue configured with
tuples. The various executor send threads will batch outgoing tuples off their outgoing queues onto the transfer queue. The default value is 1024 elements.
queue and outgoing queue. As described above, the worker process runs a dedicated worker receive thread
that is responsible for moving incoming messages to the appropriate incoming queue of the worker’s various executor threads. Similarly, each executor has its dedicated send thread that moves an executor’s outgoing messages from its outgoing queue to the “parent”
worker’s transfer queue. The sizes of the executors’ incoming and outgoing queues are configured via
respectively.
Each executor thread has a single thread that handles the user logic for the spout/bolt (i.e. your application code), and a single send thread which moves messages from the executor’s outgoing queue to the worker’s transfer queue.
The
tuples. Here, tuples are appended in batch. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
The
The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
You can also configure these parameters per individual Storm topology via backtype.storm.Config in Storm’s Java API.
your topologies. See Understanding the Parallelism of a Storm Topology for more details about the latter.
described in this article had a positive or negative effect on the performance of your Storm topologies. See Running a Multi-Node
Storm Cluster for details.
Apart from that you can also generate your own application metrics and track them with a tool like Graphite. See Installing
and Running Graphite via RPM and Supervisord for details. It might also be worth checking out ooyala’s metrics_storm project on GitHub (I haven’t used it
yet).
The TL;DR version is: Try the following settings as a first start and see whether it improves the performance of your Storm topology.
Understanding the Internal Message Buffers of Storm
JUN 21ST, 2013Table of Contents
Internal messaging within Storm worker processes
Illustration
Detailed description
Worker processes
Executors
Where to go from here
How to configure Storm’s internal message buffers
How to configure Storm’s parallelism
Understand what’s going on in your Storm topology
Advice on performance tuning
When you are optimizing the performance of your Storm topologies it helps to understand how Storm’s internal message queues are configured and put to use. In this short article I will explain and illustrate how Storm version 0.8/0.9 implements the intra-worker
communication that happens within a worker process and its associated executor threads.
Internal messaging within Storm worker processes
Terminology: I will use the terms message and (Storm) tuple interchangeablyin the following sections.
When I say “internal messaging” I mean the messaging that happens within a worker process in Storm, which is communication that is restricted to happen within the same Storm machine/node. For this communication Storm relies on various message queues backed
by LMAX Disruptor, which is a high performance inter-thread messaging library.
Note that this communication within the threads of a worker process is different from Storm’sinter-worker communication,
which normally happens across machines and thus over the network. For the latter Storm uses ZeroMQ by default (in Storm 0.9 there is experimental support for Nettyas
the network messaging backend). That is, ZeroMQ/Netty are used when a task in one worker process wants to send data to a task that runs in a worker process on different machine in the Storm cluster.
So for your reference:
Intra-worker communication in Storm (inter-thread on the same Storm node): LMAX Disruptor
Inter-worker communication (node-to-node across the network): ZeroMQ or Netty
Inter-topology communication: nothing built into Storm, you must take care of this yourself with e.g. a messaging system such as Kafka/RabbitMQ, a database, etc.
If you do not know what the differences are between Storm’s worker processes, executor threads and tasks please take a look at Understanding
the Parallelism of a Storm Topology.
Illustration
Let us start with a picture before we discuss the nitty-gritty details in the next section.Figure 1: Overview of a worker’s internal message queues in Storm. Queues related to a worker process are colored in red, queues related to the worker’s various executor threads are colored in green. For readability reasons I show only
one worker process (though normally a single Storm node runs multiple such processes) and only one executor thread within that worker process (of which, again, there are usually many per worker process).
Detailed description
Now that you got a first glimpse of Storm’s intra-worker messaging setup we can discuss the details.Worker processes
To manage its incoming and outgoing messages each worker process has a single receive thread that listens on the worker’s TCP port (as configured viasupervisor.slots.ports).
The parameter
topology.receiver.buffer.sizedetermines the batch size that the receive thread uses to place incoming messages into the incoming queues of the worker’s
executor threads. Similarly, each worker has a single send thread that is responsible for reading messages from the worker’s transfer queue and sending them over the network to downstream consumers. The size of the transfer queue is configured via
topology.transfer.buffer.size.
The
topology.receiver.buffer.sizeis the maximum number of messages that are batched together at once for appending to an executor’s incoming queue by the worker
receive thread (which reads the messages from the network) Setting this parameter too high may cause a lot of problems (“heartbeat thread gets starved, throughput plummets”). The default value is 8 elements, and the value must be a power of 2 (this requirement
comes indirectly from LMAX Disruptor).
1 2 3 | // Example: configuring via Java API Config conf = new Config(); conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // default is 8 |
is used to buffer incoming messages because in this specific case the data structure does not need to be shared with other threads, i.e. it is local to the worker’s receive thread. But because the content of this buffer is used to fill a Disruptor-backed queue
(executor incoming queues) it must still be a power of 2. See launch-receive-thread! in backtype.storm.messaging.loader for
details.
Each element of the transfer queue configured with
topology.transfer.buffer.sizeis actually a list of
tuples. The various executor send threads will batch outgoing tuples off their outgoing queues onto the transfer queue. The default value is 1024 elements.
1 2 | // Example: configuring via Java API conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // default is 1024 |
Executors
Each worker process controls one or more executor threads. Each executor thread has its ownincomingqueue and outgoing queue. As described above, the worker process runs a dedicated worker receive thread
that is responsible for moving incoming messages to the appropriate incoming queue of the worker’s various executor threads. Similarly, each executor has its dedicated send thread that moves an executor’s outgoing messages from its outgoing queue to the “parent”
worker’s transfer queue. The sizes of the executors’ incoming and outgoing queues are configured via
topology.executor.receive.buffer.sizeand
topology.executor.send.buffer.size,
respectively.
Each executor thread has a single thread that handles the user logic for the spout/bolt (i.e. your application code), and a single send thread which moves messages from the executor’s outgoing queue to the worker’s transfer queue.
The
topology.executor.receive.buffer.sizeis the size of the incoming queue for an executor. Each element of this queue is a list of
tuples. Here, tuples are appended in batch. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
1 2 | // Example: configuring via Java API conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // batched; default is 1024 |
topology.executor.send.buffer.sizeis the size of the outgoing queue for an executor. Each element of this queue will contain a single tuple.
The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
1 2 | // Example: configuring via Java API conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); // individual tuples; default is 1024 |
Where to go from here
How to configure Storm’s internal message buffers
The various default values mentioned above are defined in conf/defaults.yaml. You can override these values globally in a Storm cluster’sconf/storm.yaml.
You can also configure these parameters per individual Storm topology via backtype.storm.Config in Storm’s Java API.
How to configure Storm’s parallelism
The correct configuration of Storm’s message buffers is closely tied to the workload pattern of your topology as well as the configured parallelism ofyour topologies. See Understanding the Parallelism of a Storm Topology for more details about the latter.
Understand what’s going on in your Storm topology
The Storm UI is a good start to inspect key metrics of your running Storm topologies. For instance, it shows you the so-called “capacity” of a spout/bolt. The various metrics will help you decide whether your changes to the buffer-related configuration parametersdescribed in this article had a positive or negative effect on the performance of your Storm topologies. See Running a Multi-Node
Storm Cluster for details.
Apart from that you can also generate your own application metrics and track them with a tool like Graphite. See Installing
and Running Graphite via RPM and Supervisord for details. It might also be worth checking out ooyala’s metrics_storm project on GitHub (I haven’t used it
yet).
Advice on performance tuning
Watch Nathan Marz’s talk on Tuning and Productionization of Storm.The TL;DR version is: Try the following settings as a first start and see whether it improves the performance of your Storm topology.
1 2 34 | conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); |
相关文章推荐
- 本地通知UILocalNotification
- [Windows Phone] 地图控制项的经纬度
- [Windows Phone] 实作不同的地图显示模式
- 完全背包详解
- POJ 2125 最小割最大流
- Logic 标签库
- ZOJ 2619: Generator
- Linux socket编程(TCP,UDP,RAW)
- 《mysql技术内幕》---第一章总结笔记
- pat 1061
- Android之json数据解析
- [Windwos Phone] 实作地图缩放 MapAnimationKind 属性效果
- 单片机编程文件作用及规范
- SQL Server 存储过程
- Android出现java.net.SocketException: Permission denied的问题
- [Windwos Phone] 实作地图缩放 MapAnimationKind 属性效果
- Android 利用Mainfest.xml隐藏ActionBar
- java学习总结——第一天
- redirect 与 forward 的区别
- JAVA学习第1篇:J2EE,J2SE,J2ME,JDK,SDK,JRE,JVM区别