I am trying to write a simple Map Reduce program using Hadoop which will give me the month which is most prone to flu. I am using the google flu trends dataset which can be found here http://www.google.org/flutrends/data.txt.
I have written both the Mapper and the reducer as shown below
public class MaxFluPerMonthMapper extends Mapper<开发者_JAVA百科;LongWritable, Text, IntWritable, IntWritable> {
private static final Log LOG =
LogFactory.getLog(MaxFluPerMonthMapper.class);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String row = value.toString();
LOG.debug("Received row " + row);
List<String> columns = Arrays.asList(row.split(","));
String date = columns.get(0);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
int month = 0;
try {
Calendar calendar = Calendar.getInstance();
calendar.setTime(sdf.parse(date));
month = calendar.get(Calendar.MONTH);
} catch (ParseException e) {
e.printStackTrace();
}
for (int i = 1; i < columns.size(); i++) {
String fluIndex = columns.get(i);
if (StringUtils.isNotBlank(fluIndex) && StringUtils.isNumeric(fluIndex)) {
LOG.info("Writing key " + month + " and value " + fluIndex);
context.write(new IntWritable(month), new IntWritable(Integer.valueOf(fluIndex)));
}
}
}
}
Reducer
public class MaxFluPerMonthReducer extends Reducer<IntWritable, IntWritable, Text, IntWritable> {
private static final Log LOG =
LogFactory.getLog(MaxFluPerMonthReducer.class);
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
LOG.info("Received key " + key.get());
int sum = 0;
for (IntWritable intWritable : values) {
sum += intWritable.get();
}
int month = key.get();
String monthString = new DateFormatSymbols().getMonths()[month];
context.write(new Text(monthString), new IntWritable(sum));
}
}
With these Mapper and Reducer shown above I am getting the following output
January 545419 February 528022 March 436348 April 336759 May 346482 June 309795 July 312966 August 307346 September 322359 October 428346 November 461195 December 480078
What I want is just a single output giving me January 545419 How can I achieve this? by storing state in reducer or there is anyother solution to it? or my mapper and reducer are wrong for the question I am asking on this dataset?
The problem is that the Reducer has no idea about the other keys (by design). It would be possible to set up another Reducer to find the maximum value given all the data from your current reducer. However, this is overkill since you know you will only have 12 records you have to process, and setting up another Reducer will have more overhead than just running a serial script.
I would suggest writing some other script to process your text output.
You can add one more MapReduce step. Mapper something like this:
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// emit only first row
if (key == 0)
{
String row = value.toString();
String[] values = row.split("\t");
context.write(new Text(values[0]), new Text(values[1]));
}
}
}
Reducer must emit all it's input (which will be only one record) directly to output. Number of mappers and reducers should be set to one. If your MapReduce job uses more then one reducer you should use one another intermediate MapReduce step to combine results to one file after your MapReduce job. But this way seems not very effective.
精彩评论