001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.local.stream.graph;
023
024import java.util.Properties;
025
026import cascading.flow.FlowElement;
027import cascading.flow.FlowNode;
028import cascading.flow.FlowProcess;
029import cascading.flow.local.LocalFlowProcess;
030import cascading.flow.local.LocalFlowStep;
031import cascading.flow.local.stream.duct.ParallelFork;
032import cascading.flow.local.stream.element.LocalGroupByGate;
033import cascading.flow.local.stream.element.SyncMergeStage;
034import cascading.flow.stream.duct.Duct;
035import cascading.flow.stream.duct.Gate;
036import cascading.flow.stream.element.MemoryCoGroupGate;
037import cascading.flow.stream.element.SinkStage;
038import cascading.flow.stream.element.SourceStage;
039import cascading.flow.stream.graph.IORole;
040import cascading.flow.stream.graph.NodeStreamGraph;
041import cascading.pipe.CoGroup;
042import cascading.pipe.GroupBy;
043import cascading.pipe.Merge;
044import cascading.property.PropertyUtil;
045import cascading.tap.Tap;
046
047/**
048 *
049 */
050public class LocalStepStreamGraph extends NodeStreamGraph
051  {
052  private LocalFlowStep step;
053
054  public LocalStepStreamGraph( FlowProcess<Properties> flowProcess, LocalFlowStep step, FlowNode node )
055    {
056    super( flowProcess, node );
057    this.step = step;
058
059    buildGraph();
060    setTraps();
061    setScopes();
062
063    printGraph( node.getID(), "local", 0 );
064
065    bind();
066
067    printBoundGraph( node.getID(), "local", 0 );
068    }
069
070  protected void buildGraph()
071    {
072    for( Object rhsElement : node.getSourceTaps() )
073      {
074      Duct rhsDuct = new SourceStage( tapFlowProcess( (Tap) rhsElement ), (Tap) rhsElement );
075
076      addHead( rhsDuct );
077
078      handleDuct( (FlowElement) rhsElement, rhsDuct );
079      }
080    }
081
082  @Override
083  protected Duct createFork( Duct[] allNext )
084    {
085    return new ParallelFork( allNext );
086    }
087
088  protected Gate createCoGroupGate( CoGroup element, IORole role )
089    {
090    return new MemoryCoGroupGate( flowProcess, element );
091    }
092
093  protected Gate createGroupByGate( GroupBy element, IORole source )
094    {
095    return new LocalGroupByGate( flowProcess, element );
096    }
097
098  @Override
099  protected Duct createMergeStage( Merge merge, IORole both )
100    {
101    return new SyncMergeStage( flowProcess, merge );
102    }
103
104  @Override
105  protected SinkStage createSinkStage( Tap element )
106    {
107    return new SinkStage( tapFlowProcess( element ), element );
108    }
109
110  private LocalFlowProcess tapFlowProcess( Tap tap )
111    {
112    Properties defaultProperties = ( (LocalFlowProcess) flowProcess ).getConfig();
113    Properties tapProperties = step.getPropertiesMap().get( tap );
114
115    tapProperties = PropertyUtil.createProperties( tapProperties, defaultProperties );
116
117    return new LocalFlowProcess( (LocalFlowProcess) flowProcess, tapProperties );
118    }
119
120  }