001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple;
022    
023    import java.io.Closeable;
024    import java.io.Flushable;
025    import java.io.IOException;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.scheme.ConcreteCall;
029    import cascading.scheme.Scheme;
030    import cascading.tap.Tap;
031    import cascading.tap.TapException;
032    
033    /**
034     * Class TupleEntrySchemeCollector is a helper class for wrapping a {@link Scheme} instance, calling
035     * {@link Scheme#sink(cascading.flow.FlowProcess, cascading.scheme.SinkCall)} on every call to {@link #add(TupleEntry)}
036     * or {@link #add(Tuple)}.
037     * <p/>
038     * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
039     * {@link cascading.tap.Tap#openForWrite(cascading.flow.FlowProcess)} method.
040     */
041    public class TupleEntrySchemeCollector<Config, Output> extends TupleEntryCollector
042      {
043      private final FlowProcess<Config> flowProcess;
044      private final Scheme scheme;
045      private String identifier;
046    
047      protected final ConcreteCall<Object, Output> sinkCall;
048      private boolean prepared = false;
049    
050      public TupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Scheme scheme )
051        {
052        this( flowProcess, scheme, null, null );
053        }
054    
055      public TupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Scheme scheme, String identifier )
056        {
057        this( flowProcess, scheme, null, identifier );
058        }
059    
060      public TupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Scheme scheme, Output output )
061        {
062        this( flowProcess, scheme, output, null );
063        }
064    
065      public TupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Tap tap, Output output )
066        {
067        this( flowProcess, tap.getScheme(), output, tap.getIdentifier() );
068        }
069    
070      public TupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Scheme scheme, Output output, String identifier )
071        {
072        super( Fields.asDeclaration( scheme.getSinkFields() ) );
073        this.flowProcess = flowProcess;
074        this.scheme = scheme;
075        this.identifier = identifier;
076    
077        this.sinkCall = new ConcreteCall();
078        this.sinkCall.setOutgoingEntry( this.tupleEntry ); // created in super ctor
079    
080        if( output != null )
081          setOutput( output );
082        }
083    
084      protected FlowProcess<Config> getFlowProcess()
085        {
086        return flowProcess;
087        }
088    
089      @Override
090      public void setFields( Fields declared )
091        {
092        super.setFields( declared );
093    
094        if( this.sinkCall != null )
095          this.sinkCall.setOutgoingEntry( this.tupleEntry );
096        }
097    
098      protected Output getOutput()
099        {
100        return sinkCall.getOutput();
101        }
102    
103      protected void setOutput( Output output )
104        {
105        sinkCall.setOutput( wrapOutput( output ) );
106        }
107    
108      protected Output wrapOutput( Output output )
109        {
110        return output;
111        }
112    
113      /** Need to defer preparing the scheme till after the fields have been resolved */
114      protected void prepare()
115        {
116        try
117          {
118          scheme.sinkPrepare( flowProcess, sinkCall );
119          }
120        catch( IOException exception )
121          {
122          throw new TapException( "could not prepare scheme", exception );
123          }
124    
125        prepared = true;
126        }
127    
128      @Override
129      public void add( TupleEntry tupleEntry )
130        {
131        if( !prepared )
132          prepare();
133    
134        super.add( tupleEntry );
135        }
136    
137      @Override
138      public void add( Tuple tuple )
139        {
140        if( !prepared ) // this is unfortunate
141          prepare();
142    
143        super.add( tuple );
144        }
145    
146      @Override
147      protected void collect( TupleEntry tupleEntry ) throws IOException
148        {
149        sinkCall.setOutgoingEntry( tupleEntry );
150    
151        try
152          {
153          scheme.sink( flowProcess, sinkCall );
154          }
155        catch( Exception exception )
156          {
157          if( identifier == null || identifier.isEmpty() )
158            identifier = "'unknown'";
159    
160          throw new TupleException( "unable to sink into output identifier: " + identifier, exception );
161          }
162        }
163    
164      @Override
165      public void close()
166        {
167        try
168          {
169          if( sinkCall == null )
170            return;
171    
172          try
173            {
174            if( prepared )
175              scheme.sinkCleanup( flowProcess, sinkCall );
176            }
177          catch( IOException exception )
178            {
179            throw new TupleException( "unable to cleanup sink for output identifier: " + identifier, exception );
180            }
181          }
182        finally
183          {
184          try
185            {
186            if( getOutput() instanceof Flushable )
187              ( (Flushable) getOutput() ).flush();
188            }
189          catch( IOException exception )
190            {
191            // do nothing
192            }
193    
194          try
195            {
196            if( getOutput() instanceof Closeable )
197              ( (Closeable) getOutput() ).close();
198            }
199          catch( IOException exception )
200            {
201            // do nothing
202            }
203    
204          super.close();
205          }
206        }
207      }