开发者

How does one implement a Hadoop Mapper in Scala 2.9.0?

开发者 https://www.devze.com 2023-03-06 16:22 出处:网络
When I migrated to Scala 2.9.0 from 2.8.1, all of the code was functional except for the Hadoop mappers.Because I had some wrapper objects in the way, I distilled down to the following example:

When I migrated to Scala 2.9.0 from 2.8.1, all of the code was functional except for the Hadoop mappers. Because I had some wrapper objects in the way, I distilled down to the following example:

import org.apache.hadoop.mapreduce.{Mapper, Job}


object MyJob {
  def main(args:Array[String]) {
    val job = new Job(new Configuration())
    job.setMapperClass(classOf[MyMapper])

  }
}

class MyMapper extends Mapper[LongWritable,Text,Text,Text] {
  override def map(key: LongWritable, value: Text, context: Mapper[LongWritable,Text,Text,Text]#Context) {

  }
}

When I run this in 2.8.1, it runs quite well (and I have plenty of production code in 开发者_StackOverflow2.8.1. In 2.9.0 I get the following compilation error:

error: type mismatch;
found   : java.lang.Class[MyMapper](classOf[MyMapper])
required: java.lang.Class[_ <: org.apache.hadoop.mapreduce.Mapper]
job.setMapperClass(classOf[MyMapper])

The failing call is when I call setMapperClass on the Job object. Here's the definition of that method:

public void setMapperClass(java.lang.Class<? extends org.apache.hadoop.mapreduce.Mapper> cls) throws java.lang.IllegalStateException { /* compiled code */ }

The definition of the Mapper class itself is this:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Does anyone have a sense of what I'm doing wrong? It looks to me like the type is fundamentally correct: MyMapper does extend Mapper, and the method wants something that extends Mapper. And it works great in 2.8.1...


Silly as it seems, you can work around the problem by defining the Mapper before the Job. The following compiles:

import org.apache.hadoop._
import org.apache.hadoop.io._
import org.apache.hadoop.conf._
import org.apache.hadoop.mapreduce._

class MyMapper extends Mapper[LongWritable,Text,Text,Text] {
  override def map(key: LongWritable, value: Text, context: Mapper[LongWritable,Text,Text,Text]#Context) {
  }
}

object MyJob {
  def main(args:Array[String]) {
    val job = new Job(new Configuration())
    job.setMapperClass(classOf[MyMapper])
  }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消