001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    
033    import cascading.flow.FlowProcess;
034    import cascading.scheme.NullScheme;
035    import cascading.scheme.Scheme;
036    import cascading.tuple.Fields;
037    import cascading.tuple.TupleEntry;
038    import cascading.tuple.TupleEntryCollector;
039    import cascading.util.Util;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    /**
044     * Class MultiSinkTap is both a {@link cascading.tap.CompositeTap} and {@link cascading.tap.SinkTap} that can write to
045     * multiple child {@link cascading.tap.Tap} instances simultaneously.
046     * <p/>
047     * It is the counterpart to {@link cascading.tap.MultiSourceTap}.
048     * <p/>
049     * Note all child Tap instances may or may not have the same declared Fields. In the case they do not, all
050     * sink fields will be merged into a single Fields instance via {@link Fields#merge(cascading.tuple.Fields...)}.
051     */
052    public class MultiSinkTap<Child extends Tap, Config, Output> extends SinkTap<Config, Output> implements CompositeTap<Child>
053      {
054      /** Field LOG */
055      private static final Logger LOG = LoggerFactory.getLogger( MultiSinkTap.class );
056    
057      /** Field taps */
058      private final Child[] taps;
059      /** Field tempPath */
060      private final String tempPath = "__multisink_placeholder_" + Util.createUniqueID();
061      /** Field childConfigs */
062      private List<Map<String, String>> childConfigs;
063    
064      private class MultiSinkCollector extends TupleEntryCollector
065        {
066        TupleEntryCollector[] collectors;
067    
068        public MultiSinkCollector( FlowProcess<Config> flowProcess, Tap... taps ) throws IOException
069          {
070          super( Fields.asDeclaration( getSinkFields() ) );
071    
072          collectors = new TupleEntryCollector[ taps.length ];
073    
074          Config conf = flowProcess.getConfigCopy();
075    
076          for( int i = 0; i < taps.length; i++ )
077            {
078            Config mergedConf = childConfigs == null ? conf : flowProcess.mergeMapIntoConfig( conf, childConfigs.get( i ) );
079            Tap tap = taps[ i ];
080            LOG.info( "opening for write: {}", tap.toString() );
081    
082            collectors[ i ] = tap.openForWrite( flowProcess.copyWith( mergedConf ), null );
083            }
084          }
085    
086        protected void collect( TupleEntry tupleEntry ) throws IOException
087          {
088          for( int i = 0; i < taps.length; i++ )
089            collectors[ i ].add( tupleEntry );
090          }
091    
092        @Override
093        public void close()
094          {
095          super.close();
096    
097          try
098            {
099            for( TupleEntryCollector collector : collectors )
100              {
101              try
102                {
103                collector.close();
104                }
105              catch( Exception exception )
106                {
107                LOG.warn( "exception closing TupleEntryCollector", exception );
108                }
109              }
110            }
111          finally
112            {
113            collectors = null;
114            }
115          }
116        }
117    
118      /**
119       * Constructor MultiSinkTap creates a new MultiSinkTap instance.
120       *
121       * @param taps of type Tap...
122       */
123      @ConstructorProperties({"taps"})
124      public MultiSinkTap( Child... taps )
125        {
126        this.taps = taps;
127        }
128    
129      protected Child[] getTaps()
130        {
131        return taps;
132        }
133    
134      @Override
135      public Iterator<Child> getChildTaps()
136        {
137        return Arrays.asList( getTaps() ).iterator();
138        }
139    
140      @Override
141      public long getNumChildTaps()
142        {
143        return getTaps().length;
144        }
145    
146      @Override
147      public String getIdentifier()
148        {
149        return tempPath;
150        }
151    
152      @Override
153      public void presentSinkFields( FlowProcess<Config> flowProcess, Fields fields )
154        {
155        for( Tap child : getTaps() )
156          child.presentSinkFields( flowProcess, fields );
157        }
158    
159      @Override
160      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException
161        {
162        return new MultiSinkCollector( flowProcess, getTaps() );
163        }
164    
165      @Override
166      public void sinkConfInit( FlowProcess<Config> process, Config conf )
167        {
168        childConfigs = new ArrayList<Map<String, String>>();
169    
170        for( int i = 0; i < getTaps().length; i++ )
171          {
172          Tap tap = getTaps()[ i ];
173          Config jobConf = process.copyConfig( conf );
174    
175          tap.sinkConfInit( process, jobConf );
176    
177          childConfigs.add( process.diffConfigIntoMap( conf, jobConf ) );
178          }
179        }
180    
181      @Override
182      public boolean createResource( Config conf ) throws IOException
183        {
184        for( Tap tap : getTaps() )
185          {
186          if( !tap.createResource( conf ) )
187            return false;
188          }
189    
190        return true;
191        }
192    
193      @Override
194      public boolean deleteResource( Config conf ) throws IOException
195        {
196        for( Tap tap : getTaps() )
197          {
198          if( !tap.deleteResource( conf ) )
199            return false;
200          }
201    
202        return true;
203        }
204    
205      @Override
206      public boolean commitResource( Config conf ) throws IOException
207        {
208        for( Tap tap : getTaps() )
209          {
210          if( !tap.commitResource( conf ) )
211            return false;
212          }
213    
214        return true;
215        }
216    
217      @Override
218      public boolean rollbackResource( Config conf ) throws IOException
219        {
220        for( Tap tap : getTaps() )
221          {
222          if( !tap.rollbackResource( conf ) )
223            return false;
224          }
225    
226        return true;
227        }
228    
229      @Override
230      public boolean resourceExists( Config conf ) throws IOException
231        {
232        for( Tap tap : getTaps() )
233          {
234          if( !tap.resourceExists( conf ) )
235            return false;
236          }
237    
238        return true;
239        }
240    
241      @Override
242      public long getModifiedTime( Config conf ) throws IOException
243        {
244        long modified = getTaps()[ 0 ].getModifiedTime( conf );
245    
246        for( int i = 1; i < getTaps().length; i++ )
247          modified = Math.max( getTaps()[ i ].getModifiedTime( conf ), modified );
248    
249        return modified;
250        }
251    
252      @Override
253      public Scheme getScheme()
254        {
255        if( super.getScheme() != null )
256          return super.getScheme();
257    
258        Set<Fields> fields = new HashSet<Fields>();
259    
260        for( Tap child : getTaps() )
261          fields.add( child.getSinkFields() );
262    
263        // if all schemes have the same sink fields, the just use the scheme
264        if( fields.size() == 1 )
265          {
266          setScheme( getTaps()[ 0 ].getScheme() );
267          return super.getScheme();
268          }
269    
270        Fields allFields = Fields.merge( fields.toArray( new Fields[ fields.size() ] ) );
271    
272        setScheme( new NullScheme( allFields, allFields ) );
273    
274        return super.getScheme();
275        }
276    
277      @Override
278      public String toString()
279        {
280        return "MultiSinkTap[" + ( taps == null ? "none" : Arrays.asList( taps ) ) + ']';
281        }
282    
283      @Override
284      public boolean equals( Object o )
285        {
286        if( this == o )
287          return true;
288        if( !( o instanceof MultiSinkTap ) )
289          return false;
290        if( !super.equals( o ) )
291          return false;
292    
293        MultiSinkTap that = (MultiSinkTap) o;
294    
295        if( !Arrays.equals( taps, that.taps ) )
296          return false;
297    
298        return true;
299        }
300    
301      @Override
302      public int hashCode()
303        {
304        int result = super.hashCode();
305        result = 31 * result + ( taps != null ? Arrays.hashCode( taps ) : 0 );
306        return result;
307        }
308      }