001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.io.IOException;
024
025import cascading.CascadingException;
026import cascading.flow.FlowProcess;
027import cascading.flow.SliceCounters;
028import cascading.flow.StepCounters;
029import cascading.flow.stream.duct.Duct;
030import cascading.flow.stream.duct.DuctException;
031import cascading.flow.stream.graph.StreamGraph;
032import cascading.tap.Tap;
033import cascading.tuple.Fields;
034import cascading.tuple.TupleEntry;
035import cascading.tuple.TupleEntryCollector;
036
037/**
038 *
039 */
040public class SinkStage extends ElementStage<TupleEntry, Void>
041  {
042  private final Tap sink;
043  private TupleEntryCollector collector;
044
045  public SinkStage( FlowProcess flowProcess, Tap sink )
046    {
047    super( flowProcess, sink );
048    this.sink = sink;
049    }
050
051  public Tap getSink()
052    {
053    return sink;
054    }
055
056  @Override
057  public void bind( StreamGraph streamGraph )
058    {
059    // do not bind
060    }
061
062  @Override
063  public void prepare()
064    {
065    try
066      {
067      // todo: pass the resolved fields down
068      collector = sink.openForWrite( flowProcess, getOutput() );
069
070      if( sink.getSinkFields().isAll() )
071        {
072        Fields fields = getIncomingScopes().get( 0 ).getIncomingTapFields();
073        collector.setFields( fields );
074        }
075      }
076    catch( IOException exception )
077      {
078      throw new DuctException( "failed opening sink", exception );
079      }
080    }
081
082  protected Object getOutput()
083    {
084    return null;
085    }
086
087  @Override
088  public void start( Duct previous )
089    {
090    // do nothing
091    }
092
093  @Override
094  public void receive( Duct previous, TupleEntry tupleEntry )
095    {
096    try
097      {
098      collector.add( tupleEntry );
099      flowProcess.increment( StepCounters.Tuples_Written, 1 );
100      flowProcess.increment( SliceCounters.Tuples_Written, 1 );
101      }
102    catch( OutOfMemoryError error )
103      {
104      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
105      }
106    catch( CascadingException exception )
107      {
108      handleException( exception, tupleEntry );
109      }
110    catch( Throwable throwable )
111      {
112      handleException( new DuctException( "internal error: " + tupleEntry.getTuple().print(), throwable ), tupleEntry );
113      }
114    }
115
116  @Override
117  public void complete( Duct previous )
118    {
119    // do nothing
120    }
121
122  @Override
123  public void cleanup()
124    {
125    try
126      {
127      if( collector != null )
128        collector.close();
129
130      collector = null;
131      }
132    finally
133      {
134      super.cleanup();
135      }
136    }
137  }