001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    /**
024     *
025     */
026    public abstract class Duct<Incoming, Outgoing>
027      {
028      protected Duct<Outgoing, ?> next;
029    
030      Duct()
031        {
032        }
033    
034      Duct( Duct<Outgoing, ?> next )
035        {
036        this.next = next;
037        }
038    
039      public Duct getNext()
040        {
041        return next;
042        }
043    
044      public void bind( StreamGraph streamGraph )
045        {
046        next = getNextFor( streamGraph );
047        }
048    
049      protected Duct getNextFor( StreamGraph streamGraph )
050        {
051        return streamGraph.createNextFor( this );
052        }
053    
054      /** Called immediately after bind */
055      public void initialize()
056        {
057        }
058    
059      /**
060       *
061       */
062      public void prepare()
063        {
064        // never chain prepare calls
065        }
066    
067      public final void receiveFirst( Incoming incoming )
068        {
069        receive( null, incoming );
070        }
071    
072      public void start( Duct previous )
073        {
074        next.start( this );
075        }
076    
077      public abstract void receive( Duct previous, Incoming incoming );
078    
079      public void complete( Duct previous )
080        {
081        next.complete( this );
082        }
083    
084      public void cleanup()
085        {
086        // never chain cleanup calls
087        }
088    
089      @Override
090      public String toString()
091        {
092        return getClass().getSimpleName();
093        }
094      }