百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

Storm 集群搭建及编写WordCount

wptr33 2025-03-04 14:19 16 浏览

准备工作

1. 下载zookeeper-3.4.7

2. 下载Storm apache-storm-0.9.3

3. 安装JDK 1.7

注:

Storm0.9之前的版本,底层通讯用的是ZeroMQ,所以在安装0.9之前的版本需要安装0MQ,但是在0.9之后 我们直接安装就可以了。

因为在storm被移交到apache之后,这块用java的NIO矿建Netty代替了。

首先建立hadoop用户,我把和大数据相关的框架和软件都放在hadoop用户中。

安装ZK

1. 登陆到10.10.113.41并解压tar包

2. 建立zookeeper的data目录,
/home/hadoop/zookeeper/data

mkdir -p /home/hadoop/zookeeper/data

3. 建立zk集群的myid文件 (单机版可以跳过该步)

cd /home/hadoop/zookeeper/data

echo 1 > myid

4. 拷贝zookeeper的conf/zoo_sample.cfg并重命名为zoo.cfg,修改如下:

dataDir=/home/hadoop/zookeeper/data

server.1=10.10.113.41:2888:3888

server.2=10.10.113.42:2888:3888

server.3=10.10.113.43:2888:3888

dataDir是配置zk的数据目录的

server.A=B:C:D是集群zk使用的。如果你只想用单个zk,可以不配置。

A - 是一个数字,表示这是第几号服务器。与/var/tmp/zkdata下的myid文件内容一致

B - 是该服务器的IP地址

C - 表示该服务器与集群中的Leader服务器交换信息的端口

D - 表示如果万一集群中的Leader服务器挂了,需要各服务器重新选举时所用的通讯端口

5. (Optional)将zk的bin目录路径加入环境变量

修改/etc/profile文件,在尾部添加如下:

#zookeeper

export ZOOKEEPER==/home/hadoop/zookeeper

PATH=$PATH:$ZOOKEEPER/bin

6. 启动zk

zkServer.sh start

在剩下两台机器重复以上步骤,注意myid要对应

6.查看zk的运行状态

zkServer.sh status

安装Storm

1. 解压tar包并赋予执行权限

2. 将Storm的bin目录加入系统路径

修改/etc/profile文件,在尾部加入如下:

PATH=$PATH:/home/hadoop/storm

使其生效

3. 创建一个Storm的本地数据目录

mkdir -p /home/hadoop/storm/data

以上步骤在Storm的集群上的其他机器上重复执行,然后进行配置:

a. 配置storm.yaml

修改storm的conf/storm.yaml文件如下:

storm.zookeeper.servers: #zk地址

- "10.10.113.41"

- "10.10.113.42"

- "10.10.113.43"

nimbus.host: "10.10.113.41" #master 节点地址

supervisor.slots.ports:

- 6700

- 6701

- 6702

- 6703

storm.local.dir: "/home/hadoop/storm/data" #数据存放地址

注意:

在每个配置项前面必须留有空格,否则会无法识别。

启动集群

1. 启动nimbus

在nimbus机器的Storm的bin目录下执行

nohup bin/storm nimbus >/dev/null 2>&1 & #启动主节点

nohup bin/storm ui >/dev/null 2>&1 & #启动stormUI

nohup bin/storm logviewer >/dev/null 2>&1 & #启动logviewer 功能

2. 启动supervisor

在supervisor机器的Storm的bin目录下执行,所有supervisor节点都使用如下命令

nohup bin/storm supervisor >/dev/null 2>&1 &

nohup bin/storm logviewer >/dev/null 2>&1 &

3. 检查

打开Storm UI 页面。
http://10.10.113.41:8080/index.html

默认是启在8080端口上,如果你想改成其他的,如8089,直接修改nimbus的storm.yaml文件,添加

ui.port=8089

部署程序

1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo

2. 添加maven依赖

org.apache.storm

storm-core

0.9.3<

1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo部署程序

2. 添加maven依赖

org.apache.storm

storm-core

0.9.3

3. 新建项目,编写程序

package cn.oraclers.storm;

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

import java.util.HashMap;

import java.util.Map;

import java.util.Random;

public class WordCount {

public static class SpoutSource extends BaseRichSpout {

Map map;

TopologyContext topologyContext;

SpoutOutputCollector spoutOutputCollector;

Random random;

@Override

public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {

map = map;

topologyContext = topologyContext;

spoutOutputCollector = spoutOutputCollector;

random = random;

}

String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",

"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };

@Override

public void nextTuple() {

Utils.sleep(1000);

for (String sentence:sentences){

spoutOutputCollector.emit(new Values(sentence));

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

outputFieldsDeclarer.declare(new Fields("sentence"));

}

}

public static class SplitBoltSource extends BaseRichBolt{

Map map;

TopologyContext topologyContext;

OutputCollector outputCollector;

@Override

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

map = map;

topologyContext = topologyContext;

outputCollector = outputCollector;

}

@Override

public void execute(Tuple tuple) {

String sentence = tuple.getStringByField("sentence");

String[] words = sentence.split(" ");

for (String word:words){

this.outputCollector.emit(new Values(word));

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

outputFieldsDeclarer.declare(new Fields("word"));

}

}

public static class SumBoltSource extends BaseRichBolt{

Map map;

TopologyContext topologyContext;

OutputCollector outputCollector;

@Override

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

this.map = map;

this.topologyContext = topologyContext;

this.outputCollector = outputCollector;

}

Map mapCount = new HashMap();

@Override

public void execute(Tuple tuple) {

String word = tuple.getStringByField("word");

Integer count = mapCount.get(word);

if(count == null){

count=0;

}

count++;

mapCount.put(word,count);

outputCollector.emit(new Values(word,count));

}

@Override

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

outputFieldsDeclarer.declare(new Fields("word", "count"));

}

}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("data_source",new SpoutSource());

builder.setBolt("bolt_split",new SplitBoltSource()).shuffleGrouping("data_source");

builder.setBolt("bolt_sum",new SplitBoltSource()).fieldsGrouping("bolt_split",new Fields("word"));

try {

Config stormConf = new Config();

stormConf.setDebug(true);

StormSubmitter.submitTopology("Clustertopology", stormConf,builder.createTopology());

} catch (AlreadyAliveException e) {

e.printStackTrace();

} catch (InvalidTopologyException e) {

e.printStackTrace();

}

}

}

4. 打包部署topology

./storm jar storm jar sd-1.0-SNAPSHOT.jar cn.oraclers.storm.WordCount

5. 查看结果

两种方式,

a. 查看StormUI

注意:一定不要使用IE内核的浏览器,否则看不到Topology Summary 下面的东西!!!

b. storm的bin目录下运行

Topology_name Status Num_tasks Num_workers Uptime_secs

-------------------------------------------------------------------

test ACTIVE 28 3 5254

Clustertopology ACTIVE 4 1 83

mytopo ACTIVE 6 3 555

6. 关闭topology

a. StormUI上面点选要关闭的topology,如test,然后在新页面的Topology actions中选kill

b. 运行./storm kill test

相关推荐

MySQL进阶五之自动读写分离mysql-proxy

自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...

Postgres vs MySQL_vs2022连接mysql数据库

...

3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?

引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...

一文由浅入深带你完全掌握MySQL的锁机制原理与应用

本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...

验证Mysql中联合索引的最左匹配原则

后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...

MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)

目录1.索引基础...

你会看 MySQL 的执行计划(EXPLAIN)吗?

SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...

MySQL 从入门到精通(四)之索引结构

索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...

mysql总结——面试中最常问到的知识点

mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...

mysql总结——面试中最常问到的知识点(2)

首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...

MySQL基础全知全解!超详细无废话!轻松上手~

本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...

深入剖析 MySQL 中的锁机制原理_mysql 锁详解

在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...

Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析

引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...

MySQL基础篇:DQL数据查询操作_mysql 查

一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...

MySql:索引的基本使用_mysql索引的使用和原理

一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...