001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.duct;
023
024import java.util.Iterator;
025
026/**
027 *
028 */
029public class OpenDuct<Incoming, Outgoing> extends Duct<Grouping<Incoming, Iterator<Incoming>>, Outgoing> implements OpenWindow
030  {
031  public OpenDuct( Duct<Outgoing, ?> next )
032    {
033    super( next );
034    }
035
036  @Override
037  public void start( Duct previous )
038    {
039    next.start( previous );
040    }
041
042  @Override
043  public void receive( Duct previous, int ordinal, Grouping<Incoming, Iterator<Incoming>> grouping )
044    {
045    while( grouping.joinIterator.hasNext() )
046      next.receive( previous, 0, (Outgoing) grouping.joinIterator.next() );
047    }
048
049  @Override
050  public void complete( Duct previous )
051    {
052    next.complete( previous );
053    }
054  }