自己紹介
こんにちは、astamuseでデータエンジニアをやってる朴と申します。
astamuse入社3年目になります。
最初の1年間はweb開発エンジニアをやってましたが、
もともとデータを色々いじるのが好きだったので、上司と相談して1年前から現在の仕事をさせていただくことになりました。
←('-')(ということで弊社はやりたい仕事が出来る環境ですよーというアピールでした)
仕事では主にHadoop,Sparkを使っておりますが、Hadoop歴が1年未満というのもあり
まだまだ勉強しながらやっているという感じです。
さて本日はHadoopの初心者向けの内容を書いてみようと思います。
Hadoopの初心者向けの内容といえば
WordCountが結構定番になりますが、
今日はこちらのサンプルを少し改造して天気予報データの集計を取ってみたいと思います。
やりたいこと&データ準備
気象データから特定の地域の降雨量を月ごとに合計したプログラムを作りたいと思います。
気象データは気象庁のホームページからダウンロードできます。
地点を選ぶ
- 埼玉県に住んでるので、埼玉県のさいたまと熊谷(いつも天気予報のニュースで取り上げられてるので)を選択
項目を選ぶ
期間を選ぶ
- 連続した期間で表示する(とりあえず、5年間:2011年1月1日~2015年12月31日)
表示オプション
右側のcsvをダウンロードを押す→ダウンロードは一瞬で終わるはず
ダウンロードしたcsvはこんな感じ
ダウンロードした時刻:2016/07/19 12:12:46
熊谷 さいたま
降水量の合計(mm) 降水量の合計(mm)
2011年1月1日,--,0
2011年1月2日,--,0
2011年1月3日,--,0
・
・
・
hdfs上処理しやすいように、以下の形に整形してutf-8で保存する(--は0に置換)
2011/1/1,0,0
2011/1/2,0,0
2011/1/3,0,0
2011/1/4,0,0
2011/1/5,0,0
2011/1/6,0,0
1カラム目:日付
2カラム目:さいたま市の降雨量
3カラム目:熊谷市の降雨量
一次整形
上記のデータをhdfs上保存する
hadoop fs -put ./weather_saitama.csv hadoop-example/input
一次整形としていったん月と地域の組み合わせで降雨量の合計値を出力したいと思います。
saitama-201101 6.5
kumagai-201101 4.5
saitama-201102 6.5
kumagai-201102 4.5
Mapperはこんな感じ↓
public class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable> {
private Configuration conf;
@Override
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
}
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] cols = line.split(",");
if (cols.length == 3) {
String[] date = cols[0].split("/");
String saitamaRainfall = cols[1];
String kumagaiRainfall = cols[2];
context.write(new Text("saitama" + "-" + date[0] + String.format("%02d", Integer.parseInt(date[1]))),
new DoubleWritable(saitamaRainfall));
context.write(new Text("kumagai" + "-" + date[0] + String.format("%02d", Integer.parseInt(date[1]))),
new DoubleWritable(kumagaiRainfall));
}
}
}
Reducerはこんな感じ↓
public class IntSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private Configuration conf;
private DoubleWritable result = new DoubleWritable();
@Override
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
}
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
Iterator<DoubleWritable> valueIter = values.iterator();
Double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Jobをスタートするメインクラス
public class WeatherAnalyzer {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
FileSystem.get(conf).delete(new Path(remainingArgs[1]), true);
Job job = Job.getInstance(conf, "WeatherAnalyzer");
job.setJarByClass(WeatherAnalyzer.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
hadoop jar hadoop-example-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.astamuse.hadoop.example.WeatherAnalyzer hadoop-example/input/weather_saitama.csv hadoop-example/output
実行して結果を見てみると
hadoop fs -ls hadoop-example/output
output/part-000
output/part-001
中身を見ると以下のように合計値がちゃんと出力されてますね
kumagai-201512 39.0
saitama-201111 35.0
最終形
最終的には市毎にoutputを分けようと思います。
output/saitama/part-000
output/saitama/part-001
・
・
output/kumagai/part-000
output/kumagai/part-001
・
・
ファイルを複数のフォルダに分けて出力する時はMultipleOutputsを使います。
Jobをスタートするメインクラス
public class WeatherAnalyzer {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
FileSystem.get(conf).delete(new Path(remainingArgs[1]), true);
Job job = Job.getInstance(conf, "WeatherAnalyzer");
job.setJarByClass(WeatherAnalyzer.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(Text.class);// MultipleOutputsを使用する時、MapのKey、Valueをちゃんと指定しないと何故かError: java.io.IOException: Type mismatch in key from mapが発生
job.setMapOutputValueClass(DoubleWritable.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);// outputフォルダに空のpart-00Xファイルが大量にできるのを防ぐ
MultipleOutputs.addNamedOutput(job, "saitama", TextOutputFormat.class, Text.class, DoubleWritable.class);
MultipleOutputs.addNamedOutput(job, "kumagai", TextOutputFormat.class, Text.class, DoubleWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Mapperは上と変わらず
Reducerはこんな感じ↓
public class IntSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private Configuration conf;
private DoubleWritable result = new DoubleWritable();
private MultipleOutputs<Text, DoubleWritable> mos;
@Override
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
mos = new MultipleOutputs<Text, DoubleWritable>(context);
}
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
Double sum = 0d;
for (DoubleWritable val : values) {
sum += val.get();
}
result.set(sum);
if (key.toString().startsWith("saitama")) {
mos.write("saitama", key, result, "saitama/");
} else {
mos.write("kumagai", key, result, "kumagai/");
}
}
@Override
protected void cleanup(final Context context) throws IOException, InterruptedException {
super.cleanup(context);
mos.close();// きちんとcloseしないと書き出しが反映されない
}
}
はい、これで出来上がりです。
次回のネタは決まっておりませんが、データ処理周りのネタを継続していきたいと思います。
最後に(余談)
弊社では辛い物が好きなエンジニア同士で定期的に中華会を行ってます。
店は筆者が大好きな羊肉串が自慢な延吉香です、
中国の本土の味(東北料理と四川料理)が堪能できますので、おすすめです!
では次回また!