首页 › 月度存档 › 3 月 2014

运行C++ hadoop程序时如何设置job参数

以pipe方式运行C++ hadoop程序时,如果想要动态设置job的参数(例如设置job name),不像用Java那么方便(可能是我没有找到对应的函数),因为在头文件 Pipes.hh 中,虽然有 JobConf 这样一个类,却没有提供相应的set方法,而只有get方法。

以设置job name为例:

在hadoop JobTracker中,当前正在运行的程序会有一个名字,即job name,如果你不在程序中指定,那么它会就使用默认名字,通常默认名字是没有太多含义的,为了便于区分,我们可以在程序中设置它。
用Java写hadoop程序时,可以用Job类的setJobName(String jobName)方法来指定,非常简单。
用C++写hadoop程序时,怎么办?如前文所述,可能是我没找到门道,在Pipes.hh中我没看到可以set属性的函数。
于是可以用另一种方法:在程序运行的时候,通过命令行传入该属性值。
先说一下,job的一些属性是在hadoop目录下的 conf/mapred-site.xml 文件中定义的,此文件中包含了很多相当于 key/value 对的东西,key就是属性名,value就是属性值,如果此文件中没有定义的属性,那么就使用其默认值。知道了这一点,我们就可以复制一份 mapred-site.xml 文件(最好不要修改原来的文件,因为我们只是临时用一用),然后向复制出来的xml文件(假设其文件名为my.xml)添加如下内容:
mapred.job.name
MySpecialName

其中 mapred.job.name 是job name对应的属性名,MySpecialName是你要设置的job name,随你写。

有了这个文件,我们就可以这样来运行程序了:

hadoop pipes -conf my.xml -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input /data/ -output /my_dir/output -program /my_dir/dz_count

程序运行起来之后,你再到JobTracker中看一看,job name是不是成了“MySpecialName”?这样就解决了我们的问题。

对其他job参数,你也可以这样做。

有人说:你这哪是动态设置的,这明明是用写好的文件来传入的参数。其实不然:用C++程序动态生成一个xml文件或者修改xml文件的内容是很简单的事情,所以可以先用shell脚本调用程序,生成xml,紧接着shell脚本再带-conf参数运行C++ hadoop程序,就可以完成任务了,这也算是动态指定吧?不过整个过程是比直接使用Java写程序要麻烦。你要是知道简单的办法,请一定要留言告诉我。

注:本文基于hadoop版本0.20.2。

从零开始一步步教你:用C++开发一个简单的hadoop分布式计算应用程序

假设你有上百G的数据,你要统计出这些数据中,含有某些你感兴趣的内容的数据的有多少条,你会怎么做?在硬件条件允许的情况下,用hadoop并行计算是一个不错的选择。

为了使本文得以清晰地说明,我们不妨假设如下的情况:
我们有100G的数据,分别保存在5个文件中,它们位于 /data/ 目录下。这5个数据文件的内容均为相同的格式,即,文件的内容大致如下:

ABCDSDFKJDKF kkk 2890876
SDKFJIEORUEW nnn 1231238
LSFKDFSSDRDE bbb 9234999

说明:每一行内容中,首先是一个12字节的字符串,然后是一个3字节的字符串,然后是一个7个数字组成的字符串。字符串之间是用空格分隔的。
我们的问题是:在这100G的数据中,请统计出第二项字符串为“kkk”和“nnn”的数据分别有多少条?
如果用一个非分布式的应用程序来计算这个问题,如果计算机硬件配置不够强劲的话,那么估计得算到天荒地老了。
而用hadoop来并行计算,一切都是那么简单。
下面,我们就来看看,如何用C++开发一个hadoop上的应用程序,来完成我们的任务。
尽管hadoop平台是用Java写的,但是它仍然支持用C++来开发应用程序,这里不讨论优劣对比,只是基于这样一个事实:有些人觉得用C++写更熟悉,所以我们才用C++写。
先说明:本文基于hadoop 0.20.2版本。
(1)首先我们需要知道map-reduce的基本原理,这里不说了。其次我们需要知道,在用C++编写hadoop应用程序时,需要包含三个头文件:

#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"

这三个文件在hadoop安装包的 “c++\Linux-amd64-64\include\” 或 “c++\Linux-i386-32\include\” 子目录下(根据你的操作系统是64位或32位,分别对应不同的目录)。

既然有头文件,就需要有对应的实现文件,或者动态/静态库,这里我用的是静态库 libhadooppipes.a 和 libhadooputils.a 。静态库是在Makefile中指定的,后面再说。这里特别提醒一下大家:如果你的hadoop集群不是只有一台服务器,那么如果你编译时使用了任何动态库的话,在运行的时候就要保证在别的hadoop服务器上也能找到相应的动态库,否则就会在hadoop JobTracker的详细信息中看到找不到动态库的错误提示。

(2)下面来看看程序:

#include"Pipes.hh"
#include"TemplateFactory.hh"
#include"StringUtils.hh"

class DataCountMap:public HadoopPipes::Mapper {
public:
DataCountMap(HadoopPipes::TaskContext&context){}
void map(HadoopPipes::MapContext&context) {
std::vectorwords=HadoopUtils::splitString(context.getInputValue()," "); // 这里是分割字符串,如前文所述,每一行数据中的各项是以空格来分割的。分割的结果保存到了一个std::vector中
if("kkk"==words[1]) {
context.emit("kkk","1");
} else if("nnn"==words[1]) {
context.emit("nnn","1");
}
}
};

class DataCountReduce:public HadoopPipes::Reducer {
public:
DataCountReduce(HadoopPipes::TaskContext&context){}
void reduce(HadoopPipes::ReduceContext&context)
{
int sum=0;
while(context.nextValue()) {
sum++;
}
context.emit(context.getInputKey(),HadoopUtils::toString(sum));
}
};

int main(int argc,char*argv[])
{
return HadoopPipes::runTask(HadoopPipes::TemplateFactory());
}

上面的程序挺简单的,只要你知道了map-reduce的基本原理。
一个map类,一个reduce类,一个执行任务的main函数。
map类对每一行数据进行拆分,当找到我们感兴趣的“kkk”或“nnn”时,就生成一条输出的记录(emit函数的作用);recude类对map的数据进行汇总,这里只是简单地计数,所以每次+1。

(3)有了代码,我们接着就要编写相应的Makefile了。我的Makefile如下:

HADOOP_INSTALL = /usr/local/hadoop
INCLUDE_PATH = $(HADOOP_INSTALL)/src/c++/
CC = g++
CXXFLAGS = -Wall -g \
-I${INCLUDE_PATH}pipes/api/hadoop \
-I${INCLUDE_PATH}utils/api/hadoop
LDFLAGS = -ljvm -lhadooppipes -lhadooputils -lpthread
OBJECTS = dz_count.o
dz_count: $(OBJECTS)
$(CC) $(CXXFLAGS) -o $@ $(OBJECTS) $(LDFLAGS)

其中,HADOOP_INSTALL是你的hadoop安装路径,其余的 INCLUDE_PATH 等请对照你的目录做相应更改,最后生成的可执行程序名为dz_count。这里没有考虑release,因为仅作简单的说明用。

(4)有了代码和Makefile,就可以编译了。编译得到可执行程序dz_count。将其上传到hdfs中:

hadoop fs -put dz_count /my_dir/

其中 “/my_dir/” 是你在hdfs中的目录。

(5)下面就可以运行我们的hadoop程序了:

hadoop fs -cat /my_dir/output/part-00000

输出结果形为:
kkk 178099387
nnn 678219805

表明第二项为“kkk”的数据行共有178099387条,而“nnn”则为678219805条。

顺便再说一点废话:

(1)如何中止一个hadoop任务?当你在命令行下提交了一个hadoop job后,就算你按Ctrl+C,也不能中止掉那个job,因为它已经被Jobtracker接管了。这时,你要用如下命令中止它:

hadoop job -kill Job_ID

其中,Job_ID就是你提交的job的ID,可以在Jobtracker中查看到。

(2)一些基本概念:

map-reduce过程中,在map时,hadoop会将输入的数据按一定的大小(例如100M,这个值是可以配置的)分为若干块来处理,一个块对应一个map类,也就是说,一个块只会执行map类的构造函数一次。而每一行记录则对应一个map()方法,也就是说,一行记录就会执行一次map()方法。因此,如果你有什么信息需要输出(例如std::cout)的话,就要注意了:如果在map()方法中输出,则当输入数据量很大时,可能就会输出太多的信息,如果可以在map的构造函数中输出的话,则输出的信息会少得多。

在reduce时,对map输出的同一个key,有一个reduce类,也就是说,无论你的同一个key有多少个value,在reduce的时候只要是同一个key,就会出现在同一个reduce类里,在这个类里的reduce方法中,你用 while (context.nextValue()) 循环可以遍历所有的value,这样就可以处理同一个key的N个value了。
正因为在默认情况下,相同key的记录会落到同一个reducer中,所以,当你的key的数量比你设置的reducer的数量要少的时候,就导致了某些reducer分配不到任何数据,最终输出的某些文件(part-r-xxxxx)是空文件。如果你设置的reducer数量要少于key的数量(这是最常见的情况),那么就会有多个key落入同一个reducer中被处理,但是,每一次reduce()方法被调用时,其中将只包含一个key,同一个reducer里的多个key就会导致reduce()方法被多次调用。

这样,我们就完成了一个完整的C++ hadoop分布式应用程序的编写。

hadoop入门–通过java代码实现从本地的文件上传到Hadoop的文件系统

第一步:首先搭建java的编译环境。创建一个Java Project工程,名为upload。

第二步:选中所需的Jar包。
选中JRE System Library 选择BuildPath Configure Build Path  选择hadoop相应的jar包。
通过Add External JARS –〉Hadoop-0.20.2下所有的jar包以及lib下所有的jar包。OK。操作步骤如图:

Hadoop-0.20.2下所有的jar包。

lib下的所有jar包。

 

第三步:创建class 名为UploadFile.

 

第四步:编写代码。

 


package updatefile;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class updateFile {

public static void main(String[] args) {
try {
String localStr = args[0];
File file=new File(localStr);
String fileName = file.getName();
String dst = args[1] + '/' + fileName;
//in对应的是本地文件系统的目录
InputStream in = new BufferedInputStream(new FileInputStream(localStr));
Configuration conf = new Configuration();
//获得hadoop系统的连接
FileSystem fs = FileSystem.get(URI.create(dst),conf);

//out对应的是Hadoop文件系统中的目录
OutputStream out = fs.create(new Path(dst));
IOUtils.copyBytes(in, out, 4096,true);//4096是4k字节
System.out.println("success");
} catch (Exception e) {
System.out.println(e.toString());
}
}
}

 

 


 

第五步:在Run Configuration里面设置两个参数,并执行。

点击 Run Configuration

在Java Application中 设置 Project和MainClass


Arguments  加两个参数/home/cui/xxxx  hdf://cui:9000/xxx
Applay Run

第六步:查看结果。

显示运行成功。

在hadoop文件系统中已经看到了hadoop-0.20.2.tar.gz了。

至此文件通过代码上传成功!

 

将代码打包成jar文件,并测试。

1.  在Run Configurations里面命名为uploadfile。

2. 右击工程,执行Export,选择Runnable JAR file。点击下一步。

3. 选择刚才的configuration uploadfile,选择导出路径/home/cui。命名为upload.jar 

点击下一步,导出成功。

4. 通过java命令执行。

5. 通过浏览器查看。

upload_hdfs.jar 已经存在。

hadoop集群上传下载文件

 
在过年前夕我们小组就开始研究云计算了,之前都是在搭建云平台,配置hadoop集群环境,出现各种问题,各种不解,最终还是成功搭建了由10台机器组成的hadoop集群环境!由于写这篇技术博客的时候没有网络,暂时就不写搭建环境的步骤了,下面就在hadoop集群上运行的程序进行分析吧!
我们知道云计算应用中就有文件的上传和下载,我们提交任务到云端,让云端帮我们完成任务,下面就分析一下代码吧!
要在Hadoop集群进行操作,就应该用hadoop自己的一套api,只要我们和云端的hadoop集群连接上并用这套api编程,就可以享受云服务了!
文件的上传和下载最主要的就是org.apache.hadoop.fs FileSystem抽象类和org.apache.hadoop.conf Configuration类,再看看FileSystem的源代码:

public abstract class FileSystem extends Configured implements Closeable {

Hadoop集群上的文件系统也是与Configuration有关的,我们将这个类写进代码中,云端根据响应的api找到配置执行任务。
上传文件的代码:

//写入数据:从本机到云端(存储)
private static void uploadToHdfs() throws FileNotFoundException,IOException{
//我的文件地址
String localSrc = "H://Reading/Google_三大论文中文版.pdf";
//存放在云端的目的地址
String dest = "hdfs://192.168.1.11:9000/usr/Google_三大论文中文版.pdf";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
//得到配置对象
Configuration conf = new Configuration();
//文件系统
FileSystem fs = FileSystem.get(URI.create(dest), conf);
//输出流
OutputStream out = fs.create(new Path(dest), new Progressable() {
@Override
public void progress() {
System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
//连接两个流,形成通道,使输入流向输出流传输数据
IOUtils.copyBytes(in, out, 4096,true);
}

下载文件的代码:

//读入数据:从云端到本机(提取数据)
private static void readFromHdfs()throws FileNotFoundException,IOException{
//云端地址
String dest = "hdfs://192.168.1.11:9000/usr/Google_三大论文中文版.pdf";
//我的目录地址
String mySrc = "H://Google_三大论文中文版.pdf";
//得到配置
Configuration conf = new Configuration();
//实例化文件系统
FileSystem fs = FileSystem.get(URI.create(dest), conf);
//读出流
FSDataInputStream hdfsInStream = fs.open(new Path(dest));
//写入流
OutputStream out = new FileOutputStream(mySrc);
//将InputStrteam 中的内容通过IOUtils的copyBytes方法复制到out中
IOUtils.copyBytes(hdfsInStream, out, 4096,true);
}

这是一个小小的开端,今年要开始认真研究了!

CentOS 6.2最小化安装后再安装图形界面

在安装CentOS 6.2时发现它没有提示我要怎么安装,而是“自作主张”地给我选择了最小化安装,结果装完之后只有终端界面,因为有时候不得不用图形界面,所以如果没有图形界面也是很麻烦的。 后来通过百度才知道为什么它不让我选了,因为我用的是虚拟机,由于物理机本身配置就不高,所以我就没给虚拟机分配太多内存,而CentOS 6.2会根据你分配的内存大小决定安装方式,具体的界定我也忘记了,好像是512M还是1G,不过没关系,我们可以自己来安装图形界面,下面来说一下怎么安装。

经过以上5个步骤就可以进入图形界面了,因为我的默认语言是英文,所以才有第3、4步,如果你安装时选择的是中文,就可以跳过第3、4步了。 因为CentOS 6.2图形界面需要中文支持,所以安装中文包是必须的。

Ubuntu下配置samba实现文件夹共享

一. samba的安装:

sudo apt-get insall samba
sudo apt-get install smbfs

二. 创建共享目录:

mkdir /home/phinecos/share
sodu chmod 777 /home/phinecos/share

三. 创建Samba配置文件:

1. 保存现有的配置文件

sudo cp /etc/samba/smb.conf /etc/samba/smb.conf.bak

2. 修改现配置文件

sudo gedit /etc/samba/smb.conf

在smb.conf最后添加

[share]
path = /home/phinecos/share
available = yes
browsealbe = yes
public = yes
writable = yes

四. 创建samba帐户

sudo touch /etc/samba/smbpasswd
sudo smbpasswd -a phinecos

然后会要求你输入samba帐户的密码

[如果没有第四步,当你登录时会提示 session setup failed: NT_STATUS_LOGON_FAILURE]

五. 重启samba服务器

sudo /etc/init.d/samba restart

六. 测试

smbclient -L //localhost/share

七,使用

可以到windows下输入ip使用了,在文件夹处输入 “\\” + “Ubuntu机器的ip或主机名” + “\\” + “share”

VC2010中编译Sqlite3为静态库并带加密功能的方法.

废话不多说,直接贴出方法,目的在于请路过老鸟帮助测试一下功能是否全面,是否真正实现了加密(反正我用记事本打开生成的DB文件看不到写入记录的明文了)。

一、用开源的wxsqlite3(我用的版本是3.2.1.3)
1、到SQLite官网http://www.sqlite.org/下载sqlite-amalgamation-XXXXXX.zip(我下的版本是3.78)

,它已经包含了所有的源文件,也不需要另外的辅助工具了,解压到某一目录,如Sqlite3。
2、在VS2010里新建一个空工程,把所有文件放入工程内;
3、到http://wxcode.sourceforge.net/components/wxsqlite3下载wxsqlite3,然后把sqlite3secure.c

文件加入到工程即可;
4、在配置属性中设置配置类型为静态库(.Lib),添加预处理:
SQLITE_HAS_CODEC=1
CODEC_TYPE=CODEC_TYPE_AES128
SQLITE_CORE
THREADSAFE
SQLITE_SECURE_DELETE
SQLITE_SOUNDEX
SQLITE_ENABLE_COLUMN_METADATA
5、编译生成Lib文件。编译时如提示未找到某某文件,则从wxsqlite3的sqlite3\secure\src\codec-c目

录内拷贝相关文件至你的工程目录下再编译即可,具体要拷贝的文件有:
codec.c;codec.h;rijndael.c;rijndael.h;codecext.c

二、SQLite3的加密函数说明
sqlite3_key是输入密钥,如果数据库已加密必须先执行此函数并输入正确密钥才能进行操作,如果数据

库没有加密,执行此函数后进行数据库操作反而会出现“此数据库已加密或不是一个数据库文件”的错

误。
int sqlite3_key( sqlite3 *db, const void *pKey, int nKey),db 是指定数据库,pKey 是密钥,

nKey 是密钥长度。例:sqlite3_key( db, “abc”, 3);
sqlite3_rekey是变更密钥或给没有加密的数据库添加密钥或清空密钥,变更密钥或清空密钥前必须先正

确执行 sqlite3_key。在正确执行 sqlite3_rekey 之后在 sqlite3_close 关闭数据库之前可以正常操

作数据库,不需要再执行 sqlite3_key。
int sqlite3_rekey( sqlite3 *db, const void *pKey, int nKey),参数同上。
清空密钥为 sqlite3_rekey( db, NULL, 0)。
其实SQLite的两个加密函数使用起来非常的简单,下面分情况说明:
1、 给一个未加密的数据库添加密码:如果想要添加密码,则可以在打开数据库文件之后,关闭数据库

文件之前的任何时刻调用sqlite3_key函数即可,该函数有三个参数,其中第一个参数为数据库对象,第

二个参数是要设定的密码,第三个是密码的长度。例如:sqlite3_key(db,”1q2w3e4r”,8);        //给

数据库设定密码1q2w3e4r

注:如果数据库没有加密,执行此函数后进行数据库操作反而会出现“此数据库已加密或不是一个数据

库文件”的错误?经测试,只能在新建数据库时设置密码!
2、 读取一个加密数据库中的数据:完成这个任务依然十分简单,你只需要在打开数据库之后,再次调

用一下sqlite3_key函数即可,例如,但数据库密码是123456时,你只需要在代码中加入sqlite3_key

(db,”123456″,6);
3、 更改数据库密码:首先你需要使用当前的密码正确的打开数据库,之后你可以调用sqlite3_rekey

(db,”112233″,6) 来更改数据库密码。

4、删除密码:也就是把数据库恢复到明文状态。这时你仍然只需要调用sqlite3_rekey函数,并且把该

函数的第二个参数置为NULL或者””,或者把第三个参数设为0。

三、使用SQLITE3数据库在stdafx.h文件中加入的代码
#define SQLITE_HAS_CODEC 1
extern “C”
{
#include “sqlite3/sqlite3.h”
};
#ifdef _DEBUG
#pragma comment(lib, “sqlite3/Sqlite3EncryptionD.lib”)
#else
#pragma comment(lib, “sqlite3/Sqlite3Encryption.lib”)
#endif
/////////