001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.Closeable;
024    import java.io.IOException;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.flow.hadoop.HadoopFlowProcess;
028    import cascading.tap.Tap;
029    import cascading.tap.TapException;
030    import cascading.tap.hadoop.util.Hadoop18TapUtil;
031    import org.apache.hadoop.fs.Path;
032    import org.apache.hadoop.mapred.FileOutputFormat;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.OutputCollector;
035    import org.apache.hadoop.mapred.OutputFormat;
036    import org.apache.hadoop.mapred.RecordReader;
037    import org.apache.hadoop.mapred.RecordWriter;
038    import org.apache.hadoop.mapred.Reporter;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    
042    /**
043     *
044     */
045    public class TapOutputCollector implements OutputCollector, Closeable
046      {
047      private static final Logger LOG = LoggerFactory.getLogger( TapOutputCollector.class );
048    
049      public static final String PART_TASK_PATTERN = "%s%spart-%05d";
050      public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d";
051    
052      /** Field conf */
053      private JobConf conf;
054      /** Field writer */
055      private RecordWriter writer;
056      /** Field filenamePattern */
057      private String filenamePattern;
058      /** Field filename */
059      private String filename;
060      /** Field tap */
061      private Tap<JobConf, RecordReader, OutputCollector> tap;
062      /** Field prefix */
063      private String prefix;
064      /** Field sequence */
065      private long sequence;
066      /** Field isFileOutputFormat */
067      private boolean isFileOutputFormat;
068      /** Field flowProcess */
069      private final FlowProcess<JobConf> flowProcess;
070    
071      public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap ) throws IOException
072        {
073        this( flowProcess, tap, null );
074        }
075    
076      public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, String prefix ) throws IOException
077        {
078        this( flowProcess, tap, prefix, -1 );
079        }
080    
081      public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, String prefix, long sequence ) throws IOException
082        {
083        this.tap = tap;
084        this.sequence = sequence;
085        this.prefix = prefix == null || prefix.length() == 0 ? null : prefix;
086        this.flowProcess = flowProcess;
087        this.conf = this.flowProcess.getConfigCopy();
088        this.filenamePattern = this.conf.get( "cascading.tapcollector.partname", sequence == -1 ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN );
089    
090        initialize();
091        }
092    
093      protected void initialize() throws IOException
094        {
095        tap.sinkConfInit( flowProcess, conf );
096    
097        OutputFormat outputFormat = conf.getOutputFormat();
098    
099        isFileOutputFormat = outputFormat instanceof FileOutputFormat;
100    
101        if( isFileOutputFormat )
102          {
103          Hadoop18TapUtil.setupJob( conf );
104          Hadoop18TapUtil.setupTask( conf );
105    
106          if( prefix != null )
107            filename = String.format( filenamePattern, prefix, "/", conf.getInt( "mapred.task.partition", 0 ), sequence );
108          else
109            filename = String.format( filenamePattern, "", "", conf.getInt( "mapred.task.partition", 0 ), sequence );
110          }
111    
112        LOG.info( "creating path: {}", filename );
113    
114        writer = outputFormat.getRecordWriter( null, conf, filename, getReporter() );
115        }
116    
117      private Reporter getReporter()
118        {
119        Reporter reporter = Reporter.NULL;
120    
121        if( flowProcess instanceof HadoopFlowProcess )
122          reporter = ( (HadoopFlowProcess) flowProcess ).getReporter(); // may return Reporter.NULL
123    
124        return reporter;
125        }
126    
127      /**
128       * Method collect writes the given values to the {@link Tap} this instance encapsulates.
129       *
130       * @param writableComparable of type WritableComparable
131       * @param writable           of type Writable
132       * @throws IOException when
133       */
134      public void collect( Object writableComparable, Object writable ) throws IOException
135        {
136        flowProcess.keepAlive();
137        writer.write( writableComparable, writable );
138        }
139    
140      public void close()
141        {
142        try
143          {
144          if( isFileOutputFormat )
145            LOG.info( "closing tap collector for: {}", new Path( tap.getIdentifier(), filename ) );
146          else
147            LOG.info( "closing tap collector for: {}", tap );
148    
149          try
150            {
151            writer.close( getReporter() );
152            }
153          finally
154            {
155            if( isFileOutputFormat )
156              {
157              if( Hadoop18TapUtil.needsTaskCommit( conf ) )
158                Hadoop18TapUtil.commitTask( conf );
159    
160              Hadoop18TapUtil.cleanupJob( conf );
161              }
162            }
163          }
164        catch( IOException exception )
165          {
166          LOG.warn( "exception closing: {}", filename, exception );
167          throw new TapException( "exception closing: " + filename, exception );
168          }
169        }
170      }