分类 Hadoop 下的文章

Hadoop集群上使用Lzo压缩

自从Hadoop集群搭建以来,我们一直使用的是Gzip进行压缩

当时,我对gzip压缩过的文件和原始的log文件分别跑MapReduce测试,最终执行速度基本差不多

而且Hadoop原生支持Gzip解压,所以,当时就直接采用了Gzip压缩的方式

关于Lzo压缩,twitter有一篇文章,介绍的比较详细,见这里

Lzo压缩相比Gzip压缩,有如下特点:

  1. 压缩解压的速度很快
  2. Lzo压缩是基于Block分块的,这样,一个大的文件(在Hadoop上可能会占用多个Block块),就可以由多个MapReduce并行来进行处理

虽然Lzo的压缩比没有Gzip高,不过由于其前2个特性,在Hadoop上使用Lzo还是能整体提升集群的性能的

我测试了12个log文件,总大小为8.4G,以下是Gzip和Lzo压缩的结果:

  1. Gzip压缩,耗时480s,Gunzip解压,耗时180s,压缩后大小为2.5G
  2. Lzo压缩,耗时160s,Lzop解压,耗时110s,压缩后大小为4G

以下为在Hadoop集群上使用Lzo的步骤:

1. 在集群的所有节点上安装Lzo库,可从这里下载

cd /opt/ysz/src/lzo-2.04

./configure --enable-shared

make

make install

#编辑/etc/ld.so.conf,加入/usr/local/lib/后,执行/sbin/ldconfig

或者cp /usr/local/lib/liblzo2.* /usr/lib64/

#如果没有这一步,最终会导致以下错误:

lzo.LzoCompressor: java.lang.UnsatisfiedLinkError: Cannot load liblzo2.so.2 (liblzo2.so.2: cannot open shared object file: No such file or directory)!

2. 编译安装Hadoop Lzo本地库以及Jar包,从这里下载

    export CFLAGS=-m64

    export CXXFLAGS=-m64

    ant compile-native tar

    #将本地库以及Jar包拷贝到hadoop对应的目录下,并分发到各节点上

    cp lib/native/Linux-amd64-64/* /opt/sohuhadoop/hadoop/lib/native/Linux-amd64-64/

    cp hadoop-lzo-0.4.10.jar /opt/sohuhadoop/hadoop/lib/

3. 设置Hadoop,启用Lzo压缩

vi core-site.xml

<property>     

<name>io.compression.codecs</name>     

<value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec</value>

</property>

<property>     

<name>io.compression.codec.lzo.class</name>     

<value>com.hadoop.compression.lzo.LzoCodec</value>

</property>

 

vi mapred-site.xml

<property>

<name>mapred.compress.map.output</name>     

<value>true</value>   

</property>   

<property>     

<name>mapred.map.output.compression.codec</name>      

<value>com.hadoop.compression.lzo.LzoCodec</value>   

</property>

4. 安装lzop,从这里下载

 

下面就是使用lzop压缩log文件,并上传到Hadoop上,执行MapReduce操作,测试的Hadoop是由3个节点组成集群

lzop -v 2011041309.log

hadoop fs -put *.lzo /user/pvlog

#给Lzo文件建立Index

hadoop jar /opt/sohuhadoop/hadoop/lib/hadoop-lzo-0.4.10.jar com.hadoop.compression.lzo.LzoIndexer /user/pvlog/

写一个简单的MapReduce来测试,需要指定InputForamt为Lzo格式,否则对单个Lzo文件仍不能进行Map的并行处理

job.setInputFormatClass(com.hadoop.mapreduce.LzoTextInputFormat.class);

可以通过下面的代码来设置Reduce的数目:

job.setNumReduceTasks(8);

最终,12个文件被切分成了36个Map任务来并行处理,执行时间为52s,如下图:

我们配置Hadoop默认的Block大小是128M,如果我们想切分成更多的Map任务,可以通过设置其最大的SplitSize来完成:

FileInputFormat.setMaxInputSplitSize(job, 64 *1024 * 1024);

最终,12个文件被切分成了72个Map来处理,但处理时间反而长了,为59s,如下图:

而对于Gzip压缩的文件,即使我们设置了setMaxInputSplitSize,最终的Map数仍然是输入文件的数目12,执行时间为78s,如下图:

从以上的简单测试可以看出,使用Lzo压缩,性能确实比Gzip压缩要好不少

HBase的性能优化和相关测试

HBase的写效率还是很高的,但其随机读取效率并不高

可以采取一些优化措施来提高其性能,如:

1. 启用lzo压缩,见这里

2. 增大hbase.regionserver.handler.count数为100

3. 增大hfile.block.cache.size为0.4,提高cache大小

4. 增大hbase.hstore.blockingStoreFiles为15

5. 启用BloomFilter,在HBase0,89中可以设置

6.Put时可以设置setAutoFlush为false,到一定数目后再flushCommits

 

在14个Region Server的集群上,新建立一个lzo压缩表

测试的Put和Get的性能如下:

1. Put数据:

单线程灌入1.4亿数据,共花费50分钟,每秒能达到4万个,这个性能确实很好了,不过插入的value比较小,只有不到几十个字节

多线程put,没有测试,因为单线程的效率已经相当高了

2. Get数据:

在没有任何Block Cache,而且是Random Read的情况:

单线程平均每秒只能到250个左右

6个线程平均每秒能达到1100个左右

16个线程平均每秒能达到2500个左右

有BlockCache(曾经get过对应的row,而且还在cache中)的情况:

单线程平均每秒能到3600个左右

6个线程平均每秒能达到1.2万个左右

16个线程平均每秒能达到2.5万个左右

Hadoop集群中增加新节点

向一个正在运行的Hadoop集群中增加几个新的Nodes

1. 新节点上部署java/hadoop程序,配置相应的环境变量

2. 新节点上增加用户,从master上拷贝id_rsa.pub并配置authorized_keys

3. 新节点上设置host,需要有集群中各节点的host对应

4. 新节点上建立相关的目录,并修改属主

5. master的slaves文件中增加上相的节点,master上增加相应的host

6. 在新节点上启动datanode和tasktracker

/opt/sohuhadoop/hadoop/bin/hadoop-daemon.sh start datanode
/opt/sohuhadoop/hadoop/bin/hadoop-daemon.sh start tasktracker

7. 进行block块的均衡

在hdfs-site.xml中增加设置balance的带宽,默认只有1M:

<property>
    <name>dfs.balance.bandwidthPerSec</name>
    <value>10485760</value>
    <description>
        Specifies the maximum bandwidth that each datanode can utilize for the balancing purpose in term of the number of bytes per second.
    </description>
</property>

运行以下命令:

/opt/sohuhadoop/hadoop/bin/start-balancer.sh -threshold 3

均衡10个节点,移动400G数据,大概花费了3个小时

The cluster is balanced. Exiting...
Balancing took 2.9950980555555557 hours

利用Decommission从Hadoop集群中Remove节点

我们现有的Hadoop集群已经运行了一段时间了

由于集群中的服务器分布在2个不同的机房,受跨机房带宽的限制

集群中在2个机房之间的数据传输很慢

所以想把另一个机房的3台服务器从Hadoop集群中去掉

Hadoop提供了Decommission的特性,可以按照以下步骤来操作:

1. 在hadoop的conf目录下生成一个excludes的文件,写上需要remove的节点ip

    一个节点一行,注意要写ip,不能写Hostname,如:

10.15.10.41
10.15.10.42
10.15.10.43

2. 在hdfs-site.xml中增加配置:

<property>   
    <name>dfs.hosts.exclude</name>   
    <value>/opt/sohuhadoop/conf/excludes</value>   
    <final>true</final>
</property>

3. 复制以上2个文件到集群各节点上

4. 执行hadoop dfsadmin -refreshNodes命令,它会在后台进行Block块的移动

    从移出的Nodes上移动到其它的Nodes上面

5. 通过以下2种方式查看Decommission的状态:

    hadoop dfsadmin -report

    http://10.10.71.220:50070/dfsnodelist.jsp

    正在执行Decommission,会显示:

    Decommission Status : Decommission in progress

    执行完毕后,会显示:

    Decommission Status : Decommissioned

Hbase Shell的常用命令

总结的一些Hbase shell的命令

都很简单,可以help来查看帮助

create 'user_test','info'
describe 'user_test'
disable 'user_testinfo'
drop 'user_testinfo'
put 'user_test','test-1','info:username','test1'
put 'user_test','test-1','info','1'
put 'user_test','test-2','info:creattime','20101116'
list
scan 'user_test'
#可以指定startrow,stoprow来scan多个row
scan 'user_test',{COLUMNS =>'info:username',LIMIT =>10, STARTROW => 'test',STOPROW=>'test2'}
get 'user_test','test-1'
get 'user_test','test-1',{COLUMN => 'info:username'}
#删除某行的某一列,比如username那列的值
delete 'user_test','test-1','info:username'
#删除某行所有的列
deleteall 'user_test','test-2'     
count 'user_test'
status 'detailed'

基于Hadoop的Hbase环境搭建

基于现有的Hadoop集群,来搭建Hbase的环境

整个过程还是比较简单的

1. 下载Hbase源码,并解压

cp hbase-0.20.6.tar.gz /opt/hadoop/
cd /opt/hadoop/
tar zxvf hbase-0.20.6.tar.gz
ln -s hbase-0.20.6 hbase

2.修改hbase-env.sh,加入java环境,并修改log位置

export JAVA_HOME=/opt/java/jdk
export HBASE_LOG_DIR=/opt/log/hbase
export HBASE_MANAGES_ZK=true

3. 修改hbase-site.xml,配置hbase

<property>
    <name>hbase.rootdir</name>
    <value>hdfs://zw-hadoop-master:9000/hbase</value>
    <description>The directory shared by region servers.</description>
</property>
<property>
            <name>hbase.cluster.distributed</name>
            <value>true</value>
            <description>The mode the cluster will be in. Possible values are
              false: standalone and pseudo-distributed setups with managed Zookeeper
              true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh)
            </description>
    </property>
<property> 
    <name>hbase.master</name> 
    <value>hdfs://zw-hadoop-master:60000</value> 
</property>
<property>
    <name>hbase.zookeeper.quorum</name>       
    <value>zw-hadoop-slave225,zw-hadoop-slave226,zw-hadoop-slave227</value>
    <description>Comma separated list of servers in the ZooKeeper Quorum.      For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".      By default this is set to localhost for local and pseudo-distributed modes      of operation. For a fully-distributed setup, this should be set to a full      list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh      this is the list of servers which we will start/stop ZooKeeper on.     
    </description>
</property>
<property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/opt/log/zookeeper</value>
    <description>Property from ZooKeeper's config zoo.cfg.
        The directory where the snapshot is stored.
    </description>
</property>

几个配置的说明:

  • hbase.rootdir设置hbase在hdfs上的目录,主机名为hdfs的namenode节点所在的主机
  • hbase.cluster.distributed设置为true,表明是完全分布式的hbase集群
  • hbase.master设置hbase的master主机名和端口
  • hbase.zookeeper.quorum设置zookeeper的主机,官方推荐设置为3,5,7比较好

4. 编辑regionservers文件,设置regionservers的服务器,和hadoop的slaves一样即可

5. 启动Hbase

/opt/sohuhadoop/hbase/bin/start-hbase.sh
/opt/sohuhadoop/hbase/bin/stop-hbase.sh

Hbase默认只有一个Master,我们可以也启动多个Master:

/opt/sohuhadoop/hbase/bin/hbase-daemon.sh start master

不过,其它的Master并不会工作,只有当主Master down掉后

其它的Master才会选择接管Master的工作

Hbase也有一个简单的web界面,来查看其状态

http://10.10.71.1:60010/master.jsp
http://10.10.71.1:60030/regionserver.jsp
http://10.10.71.1:60010/zk.jsp

Hadoop集群的NameNode的备份

Hadoop集群中,NameNode节点存储着HDFS上所有文件和目录的元数据信息

如果NameNode挂了,也就意味着整个Hadoop集群也就完了

所以,NameNode节点的备份很重要,可以从以下2个方面来备份NameNode节点

1. 在hdfs-site.xml中,配置多个name的dir到不同的磁盘分区上:

<property>
    <name>dfs.name.dir</name>
    <value>/pvdata/hadoopdata/name/,/opt/hadoopdata/name/</value>
</property>

2. 在另外的一台服务器上配置Secondary NameNode:它是NameNode的一个备份

Secondary NameNode会定期合并fsimage和edits日志,将edits日志文件大小控制在一个限度下

合并的时机是由2个配置参数决定的:

fs.checkpoint.period,指定连续两次检查点的最大时间间隔, 默认值是1小时。
fs.checkpoint.size定义了edits日志文件的最大值,一旦超过这个值会导致强制执行检查点(即使没到检查点的最大时间间隔)。默认值是64MB。

Secondary NameNode的配置过程如下:

  • conf/masters中指定第二名称节点的主机名
  • 在core-site.xml中指定checkpoint的目录

<property>
  <name>fs.checkpoint.dir</name>
  <value>/opt/hadoopdata/secondname,/pvdata/hadoopdata/secondname</value>
  <description>Determines where on the local filesystem the DFS secondary
      name node should store the temporary images to merge.
      If this is a comma-delimited list of directories then the image is
      replicated in all of the directories for redundancy.
  </description>
</property>

如果NameNode节点挂了,可以按照如下步骤来从Secondary NameNode来恢复:

  • dfs.name.dir指定的位置建立一个空文件夹
  • Secondary NameNode上把secondname的目录给scp到新的NameNode机器的fs.checkpoint.dir下
  • 使用hadoop/bin/hadoop namenode -importCheckpoint来启动NameNode,主要不要执行format命令
  • 使用hadoop fsck /user命令检查文件Block的完整性

详细的Secondary NameNode细节可参考Hadoop官方文档:

http://hadoop.apache.org/common/docs/r0.20.2/hdfs_user_guide.html#Secondary+NameNode

惊天大悲剧-Hadoop的rmr和trash

这两天在操作Hadoop集群时,由于一个误操作,制作了一个天大的悲剧

不小心把Hadoop集群上的所有文件全部删除了,具体情况是这样的:

我用hadoop的超级帐户要建立一个目录,结果发现位置错了

也是,想使用rmr删掉那个目录,可是不小心把命令写成了

hadoop fs -rmr /user

于是,悲剧出现了,所有user目录下的所有目录和文件全都没有了

当时我就慌神了,赶紧从web查看50070的服务

眼看着DFS Used空间从100多G不停的减少

后来才反应过来,赶紧停掉namenode节点,然后上网google办法

后来,从secondname节点重新恢复了一个checkpoint

但绝大部分数据都已经丢失了,只恢复了一小部分数据,已经没啥用了

幸好,原始log我们在其它服务器上还保留的有,只能重新分析再入Hadoop了

总结了一下几点教训:

  1. 首先一定要控制好hadoop上各用户的权限,使各user只能操作自己的目录
  2. 尽量少用hadoop的超级用户进行操作,可以减少误操作
  3. hadoop的rm和rmr命令,设计的太BT了,连一个确认提示都没有,直接就删除了。看到有人给官方提了这个建议,但人家回复说:已经有了trash机制了,所以不需要提示,真是无语....
  4. hadoop的trash功能:很遗憾,之前没有配置trash,所以就直接给删除了,经过这次误操作,赶紧配置上trash,并设置保留时间为7天。

在core-site.xml中增加如下配置,表明rm后会在trash中保留多少分钟:

<property>
  <name>fs.trash.interval</name>
  <value>10080</value>
  <description>
      Number of minutes between trash checkpoints. If zero, the trash feature is disabled
  </description>
</property>

很遗憾的是,hadoop的这个默认值是0,就是直接删除了,为什么要这么设计呢?郁闷....

经过简单的测试,这个trash功能还是不错的,当rm后,它会move到当前文件夹下的.Trash目录下

如果你删除一个文件或目录多次,则hadoop会自动在name后加上数字序列号

这样,如果你误删除后,就可以有选择的恢复文件了

hadoop fs -mkdir /user/oplog/test
hadoop fs -put *.txt /user/oplog/test
hadoop fs -rmr /user/oplog/test
hadoop fs -ls /user/oplog/.Trash/Current/user/oplog
    drwxr-xr-x   - oplog oplog          0 2010-11-16 10:44 /user/oplog/.Trash/Current/user/oplog/test
hadoop fs -mv /user/oplog/.Trash/Current/user/oplog/test /user/oplog/
hadoop fs -ls /user/oplog/.Trash/Current/user/oplog
    drwxr-xr-x   - oplog oplog          0 2010-11-16 10:44 /user/oplog/.Trash/Current/user/oplog/test
    drwxr-xr-x   - oplog oplog          0 2010-11-16 10:47 /user/oplog/.Trash/Current/user/oplog/test.1

Hive中实现自定义函数UDF

Hive的UDF,其实很类似Mysql之类的自定义函数

不过它需要用java来编写,而不是用传统的SQL来完成

实现一个UDF的步骤如下:

  1. 实现一个Java Class,继承自UDF
  2. 打成jar包,并加入到Hive的ClassPath中
  3. 生成自定义函数,执行select
  4. 删除刚才创建的临时函数

下面这个UDF,是我给hive的array增加的一个函数

用来判断array中是否包含某个值,hive的标准函数中并没有此功能函数

package com.sohu.hadoop.hive.udf;
import java.util.*;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;

public final class ArrayContains extends UDF {

  public BooleanWritable evaluate(ArrayList<String> arr,Text ele)
    {
        BooleanWritable rtn = new BooleanWritable(false);
        if (arr == null || arr.size() < 1)
        {
            return rtn;
        }
        try {
            String cstr = ele.toString();   
            for (String str : arr)
            {
                if (str.equals(cstr))
                {
                    rtn = new BooleanWritable(true);
                    break;
                }
            }
           
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        return rtn;
    }
}

然后执行编译打包:

javac -classpath /opt/hadoop_client/hadoop/hadoop-0.20.2+228-core.jar:/opt/hadoop_client/hive/lib/hive-exec-0.5.0.jar src/com/sohu/hadoop/hive/udf/ArrayContains.java -d build
jar -cvf hadooop-mc-udf.jar -C build .

最后执行Hive QL查询:

hive -e "add jar /opt/ysz/udf/hadooop-mc-udf.jar;drop temporary function array_contains;create temporary function array_contains as 'com.sohu.hadoop.hive.udf.ArrayContains';select suv,channelid from pvlog_pre where array_contains(channelid,'2')"

hadoop中mapred.tasktracker.map.tasks.maximum的设置

目前,我们邮件的一部分log已经迁移到Hadoop集群上

并由Hive来执行相关的查询

hadoop中默认的mapred.tasktracker.map.tasks.maximum设置是2

也即:每一个tasktracker同时运行的map任务数为2

照此默认设置,查询80天某用户的操作日志,耗时5mins, 45sec

经过测试,发现将mapred.tasktracker.map.tasks.maximum设置为节点的cpu cores数目或者数目减1比较合适

此时的运行效率最高,大概花费3mins, 25sec

我们现在的机器都是8核的,所以最终配置如下:

<property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>8</value>
    <description>The maximum number of map tasks that will be run
    simultaneously by a task tracker.
    </description>
</property>

而对于mapred.map.tasks(每个job的map任务数)值,hadoop默认值也为2

可以在执行hive前,通过set mapred.map.tasks=24来设定

但由于使用hive,会操作多个input文件,所以hive默认会把map的任务数设置成输入的文件数目

即使你通过set设置了数目,也不起作用...

最新文章

最近回复

  • feifei435:这两个URI实际是不一样的
  • zsy: git push origin 分支 -f 给力!
  • 冼敏兵:简单易懂,good fit
  • Jack:无需改配置文件,看着累! # gluster volume se...
  • Mr.j:按照你的方法凑效了,折腾死了。。。。
  • zheyemaster:补充一句:我的网站路径:D:\wamp\www ~~菜鸟站长, ...
  • zheyemaster:wamp2.5(apache2.4.9)下局域网访问403错误的...
  • Git中pull对比fetch和merge | 炼似春秋:[…] 首先,我搜索了git pull和git fe...
  • higkoo:总结一下吧, 性能调优示例: gluster volume s...
  • knowaeap:请问一下博主,你维护的openyoudao支持opensuse吗

分类

归档

其它