001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import cascading.flow.FlowProcess;
034import cascading.scheme.NullScheme;
035import cascading.scheme.Scheme;
036import cascading.tuple.Fields;
037import cascading.tuple.TupleEntry;
038import cascading.tuple.TupleEntryCollector;
039import cascading.util.Util;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Class MultiSinkTap is both a {@link cascading.tap.CompositeTap} and {@link cascading.tap.SinkTap} that can write to
045 * multiple child {@link cascading.tap.Tap} instances simultaneously.
046 * <p/>
047 * It is the counterpart to {@link cascading.tap.MultiSourceTap}.
048 * <p/>
049 * Note all child Tap instances may or may not have the same declared Fields. In the case they do not, all
050 * sink fields will be merged into a single Fields instance via {@link Fields#merge(cascading.tuple.Fields...)}.
051 */
052public class MultiSinkTap<Child extends Tap, Config, Output> extends SinkTap<Config, Output> implements CompositeTap<Child>
053  {
054  /** Field LOG */
055  private static final Logger LOG = LoggerFactory.getLogger( MultiSinkTap.class );
056
057  /** Field taps */
058  private final Child[] taps;
059  /** Field tempPath */
060  private final String tempPath = "__multisink_placeholder_" + Util.createUniqueID();
061  /** Field childConfigs */
062  private List<Map<String, String>> childConfigs;
063
064  private class MultiSinkCollector extends TupleEntryCollector
065    {
066    TupleEntryCollector[] collectors;
067
068    public <C extends Config> MultiSinkCollector( FlowProcess<C> flowProcess, Tap<Config, ?, ?>... taps ) throws IOException
069      {
070      super( Fields.asDeclaration( getSinkFields() ) );
071
072      collectors = new TupleEntryCollector[ taps.length ];
073
074      C conf = flowProcess.getConfigCopy();
075
076      for( int i = 0; i < taps.length; i++ )
077        {
078        C mergedConf = childConfigs == null ? conf : flowProcess.mergeMapIntoConfig( conf, childConfigs.get( i ) );
079        Tap<Config, ?, ?> tap = taps[ i ];
080        LOG.info( "opening for write: {}", tap.toString() );
081
082        collectors[ i ] = tap.openForWrite( flowProcess.copyWith( mergedConf ), null );
083        }
084      }
085
086    protected void collect( TupleEntry tupleEntry ) throws IOException
087      {
088      for( int i = 0; i < taps.length; i++ )
089        collectors[ i ].add( tupleEntry );
090      }
091
092    @Override
093    public void close()
094      {
095      super.close();
096
097      try
098        {
099        for( TupleEntryCollector collector : collectors )
100          {
101          try
102            {
103            collector.close();
104            }
105          catch( Exception exception )
106            {
107            LOG.warn( "exception closing TupleEntryCollector", exception );
108            }
109          }
110        }
111      finally
112        {
113        collectors = null;
114        }
115      }
116    }
117
118  /**
119   * Constructor MultiSinkTap creates a new MultiSinkTap instance.
120   *
121   * @param taps of type Tap...
122   */
123  @ConstructorProperties({"taps"})
124  public MultiSinkTap( Child... taps )
125    {
126    this.taps = taps;
127    }
128
129  protected Child[] getTaps()
130    {
131    return taps;
132    }
133
134  @Override
135  public Iterator<Child> getChildTaps()
136    {
137    return Arrays.asList( getTaps() ).iterator();
138    }
139
140  @Override
141  public long getNumChildTaps()
142    {
143    return getTaps().length;
144    }
145
146  @Override
147  public String getIdentifier()
148    {
149    return tempPath;
150    }
151
152  @Override
153  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields )
154    {
155    for( Tap child : getTaps() )
156      child.presentSinkFields( flowProcess, fields );
157    }
158
159  @Override
160  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException
161    {
162    return new MultiSinkCollector( flowProcess, getTaps() );
163    }
164
165  @Override
166  public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
167    {
168    bridge( flowProcess, conf );
169    }
170
171  private void bridge( FlowProcess flowProcess, Object conf )
172    {
173    childConfigs = new ArrayList<>();
174
175    for( int i = 0; i < getTaps().length; i++ )
176      {
177      Tap tap = getTaps()[ i ];
178      Object newConfig = flowProcess.copyConfig( conf );
179
180      tap.sinkConfInit( flowProcess, newConfig );
181
182      childConfigs.add( flowProcess.diffConfigIntoMap( conf, newConfig ) );
183      }
184    }
185
186  @Override
187  public boolean createResource( Config conf ) throws IOException
188    {
189    for( Tap tap : getTaps() )
190      {
191      if( !tap.createResource( conf ) )
192        return false;
193      }
194
195    return true;
196    }
197
198  @Override
199  public boolean deleteResource( Config conf ) throws IOException
200    {
201    for( Tap tap : getTaps() )
202      {
203      if( !tap.deleteResource( conf ) )
204        return false;
205      }
206
207    return true;
208    }
209
210  @Override
211  public boolean commitResource( Config conf ) throws IOException
212    {
213    for( Tap tap : getTaps() )
214      {
215      if( !tap.commitResource( conf ) )
216        return false;
217      }
218
219    return true;
220    }
221
222  @Override
223  public boolean rollbackResource( Config conf ) throws IOException
224    {
225    for( Tap tap : getTaps() )
226      {
227      if( !tap.rollbackResource( conf ) )
228        return false;
229      }
230
231    return true;
232    }
233
234  @Override
235  public boolean resourceExists( Config conf ) throws IOException
236    {
237    for( Tap tap : getTaps() )
238      {
239      if( !tap.resourceExists( conf ) )
240        return false;
241      }
242
243    return true;
244    }
245
246  @Override
247  public long getModifiedTime( Config conf ) throws IOException
248    {
249    long modified = getTaps()[ 0 ].getModifiedTime( conf );
250
251    for( int i = 1; i < getTaps().length; i++ )
252      modified = Math.max( getTaps()[ i ].getModifiedTime( conf ), modified );
253
254    return modified;
255    }
256
257  @Override
258  public Scheme getScheme()
259    {
260    if( super.getScheme() != null )
261      return super.getScheme();
262
263    Set<Fields> fields = new HashSet<Fields>();
264
265    for( Tap child : getTaps() )
266      fields.add( child.getSinkFields() );
267
268    // if all schemes have the same sink fields, the just use the scheme
269    if( fields.size() == 1 )
270      {
271      setScheme( getTaps()[ 0 ].getScheme() );
272      return super.getScheme();
273      }
274
275    Fields allFields = Fields.merge( fields.toArray( new Fields[ fields.size() ] ) );
276
277    setScheme( new NullScheme( allFields, allFields ) );
278
279    return super.getScheme();
280    }
281
282  @Override
283  public String toString()
284    {
285    return "MultiSinkTap[" + ( taps == null ? "none" : Arrays.asList( taps ) ) + ']';
286    }
287
288  @Override
289  public boolean equals( Object o )
290    {
291    if( this == o )
292      return true;
293    if( !( o instanceof MultiSinkTap ) )
294      return false;
295    if( !super.equals( o ) )
296      return false;
297
298    MultiSinkTap that = (MultiSinkTap) o;
299
300    if( !Arrays.equals( taps, that.taps ) )
301      return false;
302
303    return true;
304    }
305
306  @Override
307  public int hashCode()
308    {
309    int result = super.hashCode();
310    result = 31 * result + ( taps != null ? Arrays.hashCode( taps ) : 0 );
311    return result;
312    }
313  }