fix LzoBinaryBlockRecordReader bug: skip bad block rather than throws IOE#476
fix LzoBinaryBlockRecordReader bug: skip bad block rather than throws IOE#476zhuguangbin wants to merge 1 commit into
Conversation
|
I think we would want this behind some config settings. Maybe something similar to that allows for setting a max number of tolerated errors. Additionally, I think we'd want to be sure that catching this sort of exception is recoverable. How do we know the rest of the file isn't corrupt? If a proto blob is corrupt, we need to be sure we know where the next good one starts. I would also point out that you may want to write your files to a temporary location and then "commit" them by closing them and then moving them to a final destination. That way if something crashes, the whole file gets thrown out and you can rebuild it from scratch. |
|
zhuguangbin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
In some scenario, LzoThriftBlock file may be corrupt. For example, in our company, we consume kafka data and sink to HDFS as LzoThriftBlock format using a flink streaming job. If the job crash, the writing file may be corrupt. The mapreduce job reading the corrupt file will fail .
Error logs are as follows:
org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: Premature EOF from inputStream
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:201)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.parseNextBlock(BinaryBlockReader.java:145)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.setupNewBlockIfNeeded(BinaryBlockReader.java:169)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.readNextProtoBytes(BinaryBlockReader.java:87)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.readNext(BinaryBlockReader.java:74)
at com.twitter.elephantbird.mapreduce.input.LzoBinaryBlockRecordReader.nextKeyValue(LzoBinaryBlockRecordReader.java:138)
at com.twitter.elephantbird.pig.load.LzoBaseLoadFunc.getNextBinaryValue(LzoBaseLoadFunc.java:108)
at com.twitter.elephantbird.pig.load.ThriftPigLoader.getNext(ThriftPigLoader.java:48)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.nextKeyValue(PigRecordReader.java:211)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
The reason is the last block of the split is incomplete because of suddenly quit of the writing steam. I think this could be more tolerant,although the last block is corrupt, the preceding blocks of the split is OK. All we should do is skipping the corrupt block.