astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

HadoopのWordCountを天気予報のデータに適用してみよう!

自己紹介

こんにちは、astamuseでデータエンジニアをやってる朴と申します。

astamuse入社3年目になります。 最初の1年間はweb開発エンジニアをやってましたが、 もともとデータを色々いじるのが好きだったので、上司と相談して1年前から現在の仕事をさせていただくことになりました。 ←('-')(ということで弊社はやりたい仕事が出来る環境ですよーというアピールでした)

仕事では主にHadoop,Sparkを使っておりますが、Hadoop歴が1年未満というのもあり まだまだ勉強しながらやっているという感じです。

さて本日はHadoopの初心者向けの内容を書いてみようと思います。

Hadoopの初心者向けの内容といえば WordCountが結構定番になりますが、

今日はこちらのサンプルを少し改造して天気予報データの集計を取ってみたいと思います。

やりたいこと&データ準備

気象データから特定の地域の降雨量を月ごとに合計したプログラムを作りたいと思います。
気象データは気象庁のホームページからダウンロードできます。

  • 地点を選ぶ

    • 埼玉県に住んでるので、埼玉県のさいたまと熊谷(いつも天気予報のニュースで取り上げられてるので)を選択
  • 項目を選ぶ

    • データの種類
      • 日別値
    • 項目
      • 降水タブで降水量の日合計を選択
  • 期間を選ぶ

    • 連続した期間で表示する(とりあえず、5年間:2011年1月1日~2015年12月31日)
  • 表示オプション

    • 何もしない

右側のcsvをダウンロードを押す→ダウンロードは一瞬で終わるはず

ダウンロードしたcsvはこんな感じ

ダウンロードした時刻:2016/07/19 12:12:46


           熊谷             さいたま
           降水量の合計(mm) 降水量の合計(mm)
2011年1月1日,--,0
2011年1月2日,--,0
2011年1月3日,--,0
・
・
・

hdfs上処理しやすいように、以下の形に整形してutf-8で保存する(--は0に置換)

2011/1/1,0,0
2011/1/2,0,0
2011/1/3,0,0
2011/1/4,0,0
2011/1/5,0,0
2011/1/6,0,0

1カラム目:日付
2カラム目:さいたま市の降雨量
3カラム目:熊谷市の降雨量

一次整形

上記のデータをhdfs上保存する

hadoop fs -put ./weather_saitama.csv hadoop-example/input

一次整形としていったん月と地域の組み合わせで降雨量の合計値を出力したいと思います。

saitama-201101    6.5
kumagai-201101    4.5
saitama-201102    6.5
kumagai-201102    4.5

Mapperはこんな感じ↓

public class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable> {

        private Configuration conf;

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            conf = context.getConfiguration();
        }

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] cols = line.split(",");
            if (cols.length == 3) {
                String[] date = cols[0].split("/");
                String saitamaRainfall = cols[1];
                String kumagaiRainfall = cols[2];
                context.write(new Text("saitama" + "-" + date[0] + String.format("%02d", Integer.parseInt(date[1]))),
                        new DoubleWritable(saitamaRainfall));
                context.write(new Text("kumagai" + "-" + date[0] + String.format("%02d", Integer.parseInt(date[1]))),
                        new DoubleWritable(kumagaiRainfall));
            }
        }
}

Reducerはこんな感じ↓

public class IntSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        private Configuration conf;
        private DoubleWritable result = new DoubleWritable();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            conf = context.getConfiguration();
        }

        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            Iterator<DoubleWritable> valueIter = values.iterator();

            Double sum = 0;
            for (DoubleWritable val : values) {
                sum += val.get();
            }

            result.set(sum);

            context.write(key, result);
        }

}

Jobをスタートするメインクラス

public class WeatherAnalyzer {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        FileSystem.get(conf).delete(new Path(remainingArgs[1]), true);

        Job job = Job.getInstance(conf, "WeatherAnalyzer");
        job.setJarByClass(WeatherAnalyzer.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);

        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

hadoop jar hadoop-example-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.astamuse.hadoop.example.WeatherAnalyzer hadoop-example/input/weather_saitama.csv hadoop-example/output

実行して結果を見てみると

hadoop fs -ls hadoop-example/output

output/part-000
output/part-001

中身を見ると以下のように合計値がちゃんと出力されてますね

kumagai-201512    39.0
saitama-201111    35.0

最終形

最終的には市毎にoutputを分けようと思います。

output/saitama/part-000
output/saitama/part-001
・
・

output/kumagai/part-000
output/kumagai/part-001
・
・

ファイルを複数のフォルダに分けて出力する時はMultipleOutputsを使います。

Jobをスタートするメインクラス

public class WeatherAnalyzer {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        FileSystem.get(conf).delete(new Path(remainingArgs[1]), true);

        Job job = Job.getInstance(conf, "WeatherAnalyzer");
        job.setJarByClass(WeatherAnalyzer.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(Text.class);// MultipleOutputsを使用する時、MapのKey、Valueをちゃんと指定しないと何故かError: java.io.IOException: Type mismatch in key from mapが発生
        job.setMapOutputValueClass(DoubleWritable.class);

        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);// outputフォルダに空のpart-00Xファイルが大量にできるのを防ぐ

        MultipleOutputs.addNamedOutput(job, "saitama", TextOutputFormat.class, Text.class, DoubleWritable.class);
        MultipleOutputs.addNamedOutput(job, "kumagai", TextOutputFormat.class, Text.class, DoubleWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

Mapperは上と変わらず

Reducerはこんな感じ↓

public class IntSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        private Configuration conf;
        private DoubleWritable result = new DoubleWritable();
        private MultipleOutputs<Text, DoubleWritable> mos;

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            conf = context.getConfiguration();
            mos = new MultipleOutputs<Text, DoubleWritable>(context);
        }

        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            Double sum = 0d;
            for (DoubleWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            if (key.toString().startsWith("saitama")) {
                mos.write("saitama", key, result, "saitama/");
            } else {
                mos.write("kumagai", key, result, "kumagai/");
            }
        }

        @Override
        protected void cleanup(final Context context) throws IOException, InterruptedException {
            super.cleanup(context);
            mos.close();// きちんとcloseしないと書き出しが反映されない
        }

}

はい、これで出来上がりです。

次回のネタは決まっておりませんが、データ処理周りのネタを継続していきたいと思います。

最後に(余談)

弊社では辛い物が好きなエンジニア同士で定期的に中華会を行ってます。
店は筆者が大好きな羊肉串が自慢な延吉香です、 中国の本土の味(東北料理と四川料理)が堪能できますので、おすすめです!

では次回また!

Copyright © astamuse company, ltd. all rights reserved.