001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple;
022
023import java.io.Closeable;
024import java.io.Flushable;
025import java.io.IOException;
026
027import cascading.flow.FlowProcess;
028import cascading.scheme.ConcreteCall;
029import cascading.scheme.Scheme;
030import cascading.tap.Tap;
031import cascading.tap.TapException;
032
033/**
034 * Class TupleEntrySchemeCollector is a helper class for wrapping a {@link Scheme} instance, calling
035 * {@link Scheme#sink(cascading.flow.FlowProcess, cascading.scheme.SinkCall)} on every call to {@link #add(TupleEntry)}
036 * or {@link #add(Tuple)}.
037 * <p/>
038 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
039 * {@link cascading.tap.Tap#openForWrite(cascading.flow.FlowProcess)} method.
040 */
041public class TupleEntrySchemeCollector<Config, Output> extends TupleEntryCollector
042  {
043  private final FlowProcess<? extends Config> flowProcess;
044  private final Scheme scheme;
045  private String identifier;
046
047  protected final ConcreteCall<Object, Output> sinkCall;
048  private boolean prepared = false;
049
050  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme )
051    {
052    this( flowProcess, scheme, null, null );
053    }
054
055  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, String identifier )
056    {
057    this( flowProcess, scheme, null, identifier );
058    }
059
060  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, Output output )
061    {
062    this( flowProcess, scheme, output, null );
063    }
064
065  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Output output )
066    {
067    this( flowProcess, tap.getScheme(), output, tap.getIdentifier() );
068    }
069
070  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, Output output, String identifier )
071    {
072    super( Fields.asDeclaration( scheme.getSinkFields() ) );
073    this.flowProcess = flowProcess;
074    this.scheme = scheme;
075    this.identifier = identifier;
076
077    this.sinkCall = new ConcreteCall();
078    this.sinkCall.setOutgoingEntry( this.tupleEntry ); // created in super ctor
079
080    if( output != null )
081      setOutput( output );
082    }
083
084  protected FlowProcess<? extends Config> getFlowProcess()
085    {
086    return flowProcess;
087    }
088
089  @Override
090  public void setFields( Fields declared )
091    {
092    super.setFields( declared );
093
094    if( this.sinkCall != null )
095      this.sinkCall.setOutgoingEntry( this.tupleEntry );
096    }
097
098  protected Output getOutput()
099    {
100    return sinkCall.getOutput();
101    }
102
103  protected void setOutput( Output output )
104    {
105    sinkCall.setOutput( wrapOutput( output ) );
106    }
107
108  protected Output wrapOutput( Output output )
109    {
110    return output;
111    }
112
113  /** Need to defer preparing the scheme till after the fields have been resolved */
114  protected void prepare()
115    {
116    try
117      {
118      scheme.sinkPrepare( flowProcess, sinkCall );
119      }
120    catch( IOException exception )
121      {
122      throw new TapException( "could not prepare scheme", exception );
123      }
124
125    prepared = true;
126    }
127
128  @Override
129  public void add( TupleEntry tupleEntry )
130    {
131    if( !prepared )
132      prepare();
133
134    super.add( tupleEntry );
135    }
136
137  @Override
138  public void add( Tuple tuple )
139    {
140    if( !prepared ) // this is unfortunate
141      prepare();
142
143    super.add( tuple );
144    }
145
146  @Override
147  protected void collect( TupleEntry tupleEntry ) throws IOException
148    {
149    sinkCall.setOutgoingEntry( tupleEntry );
150
151    try
152      {
153      scheme.sink( flowProcess, sinkCall );
154      }
155    catch( Exception exception )
156      {
157      if( identifier == null || identifier.isEmpty() )
158        identifier = "'unknown'";
159
160      throw new TupleException( "unable to sink into output identifier: " + identifier, exception );
161      }
162    }
163
164  @Override
165  public void close()
166    {
167    try
168      {
169      if( sinkCall == null )
170        return;
171
172      try
173        {
174        if( prepared )
175          scheme.sinkCleanup( flowProcess, sinkCall );
176        }
177      catch( IOException exception )
178        {
179        throw new TupleException( "unable to cleanup sink for output identifier: " + identifier, exception );
180        }
181      }
182    finally
183      {
184      try
185        {
186        if( getOutput() instanceof Flushable )
187          ( (Flushable) getOutput() ).flush();
188        }
189      catch( IOException exception )
190        {
191        // do nothing
192        }
193
194      try
195        {
196        if( getOutput() instanceof Closeable )
197          ( (Closeable) getOutput() ).close();
198        }
199      catch( IOException exception )
200        {
201        // do nothing
202        }
203
204      super.close();
205      }
206    }
207  }