001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.util;
022
023import java.io.IOException;
024import java.net.URI;
025
026import cascading.flow.FlowException;
027import cascading.flow.hadoop.HadoopFlowProcess;
028import cascading.scheme.hadoop.TextLine;
029import cascading.tap.SinkMode;
030import cascading.tap.hadoop.Hfs;
031import cascading.tap.hadoop.Lfs;
032import cascading.tuple.Fields;
033import cascading.tuple.Tuple;
034import cascading.tuple.TupleEntryCollector;
035import cascading.tuple.TupleEntryIterator;
036import cascading.util.Util;
037import org.apache.hadoop.filecache.DistributedCache;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.mapred.JobConf;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 *
045 */
046public class HadoopMRUtil
047  {
048  private static final Logger LOG = LoggerFactory.getLogger( HadoopMRUtil.class );
049
050  public static String writeStateToDistCache( JobConf conf, String id, String kind, String stepState )
051    {
052    if( Util.isEmpty( stepState ) )
053      return null;
054
055    LOG.info( "writing step state to dist cache, too large for job conf, size: {}", stepState.length() );
056
057    String statePath = Hfs.getTempPath( conf ) + "/" + kind + "-state-" + id;
058
059    Hfs temp = new Hfs( new TextLine(), statePath, SinkMode.REPLACE );
060
061    try
062      {
063      TupleEntryCollector writer = temp.openForWrite( new HadoopFlowProcess( conf ) );
064
065      writer.add( new Tuple( stepState ) );
066
067      writer.close();
068      }
069    catch( IOException exception )
070      {
071      throw new FlowException( "unable to write step state to Hadoop FS: " + temp.getIdentifier() );
072      }
073
074    URI uri = new Path( statePath ).toUri();
075    DistributedCache.addCacheFile( uri, conf );
076
077    LOG.info( "using step state path: {}", uri );
078
079    return statePath;
080    }
081
082  public static String readStateFromDistCache( JobConf jobConf, String id, String kind ) throws IOException
083    {
084    Path[] files = DistributedCache.getLocalCacheFiles( jobConf );
085
086    Path stepStatePath = null;
087
088    for( Path file : files )
089      {
090      if( !file.toString().contains( kind + "-state-" + id ) )
091        continue;
092
093      stepStatePath = file;
094      break;
095      }
096
097    if( stepStatePath == null )
098      throw new FlowException( "unable to find step state from distributed cache" );
099
100    LOG.info( "reading step state from local path: {}", stepStatePath );
101
102    Hfs temp = new Lfs( new TextLine( new Fields( "line" ) ), stepStatePath.toString() );
103
104    TupleEntryIterator reader = null;
105
106    try
107      {
108      reader = temp.openForRead( new HadoopFlowProcess( jobConf ) );
109
110      if( !reader.hasNext() )
111        throw new FlowException( "step state path is empty: " + temp.getIdentifier() );
112
113      return reader.next().getString( 0 );
114      }
115    catch( IOException exception )
116      {
117      throw new FlowException( "unable to find state path: " + temp.getIdentifier(), exception );
118      }
119    finally
120      {
121      try
122        {
123        if( reader != null )
124          reader.close();
125        }
126      catch( IOException exception )
127        {
128        LOG.warn( "error closing state path reader", exception );
129        }
130      }
131    }
132
133  public static boolean hasReducer( JobConf jobConf )
134    {
135    return jobConf.getReducerClass() != null;
136    }
137  }