001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.cascade; 022 023import java.beans.ConstructorProperties; 024import java.util.Collection; 025import java.util.Map; 026 027import cascading.cascade.planner.FlowGraph; 028import cascading.cascade.planner.IdentifierGraph; 029import cascading.flow.Flow; 030import cascading.tap.Tap; 031import cascading.util.Util; 032 033import static cascading.cascade.CascadeDef.cascadeDef; 034 035/** 036 * Class CascadeConnector is used to construct a new {@link Cascade} instance from a collection of {@link cascading.flow.Flow} instance. 037 * <p/> 038 * Note order is not significant when adding passing Flow instances to the {@code connect} 039 * method. This connector will order them based on their dependencies, if any. 040 * <p/> 041 * One Flow is dependent on another if the first sinks (produces) output that the second Flow sources (consumes) as 042 * input. A sink and source are considered equivalent if the fully qualified identifier, typically {@link Tap#getFullIdentifier(Object)} 043 * from either are {@code equals()}. 044 * <p/> 045 * <p/> 046 * Note that checkpoint sink Taps from an upstream Flow may be the sources to downstream Flow instances. 047 * <p/> 048 * The {@link CascadeDef} is a convenience class for dynamically defining a Cascade that can be passed to the 049 * {@link CascadeConnector#connect(CascadeDef)} method. 050 * <p/> 051 * Use the {@link CascadeProps} fluent helper class to create global properties to pass to the CascadeConnector 052 * constructor. 053 * 054 * @see CascadeDef 055 * @see CascadeProps 056 */ 057public class CascadeConnector 058 { 059 /** Field properties */ 060 private Map<Object, Object> properties; 061 062 /** Constructor CascadeConnector creates a new CascadeConnector instance. */ 063 public CascadeConnector() 064 { 065 } 066 067 /** 068 * Constructor CascadeConnector creates a new CascadeConnector instance. 069 * 070 * @param properties of type Map<Object, Object> 071 */ 072 @ConstructorProperties({"properties"}) 073 public CascadeConnector( Map<Object, Object> properties ) 074 { 075 this.properties = properties; 076 } 077 078 /** 079 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name 080 * of the Cascade is derived from the given Flow instances. 081 * 082 * @param flows of type Collection<Flow> 083 * @return Cascade 084 */ 085 public Cascade connect( Collection<Flow> flows ) 086 { 087 return connect( null, flows.toArray( new Flow[ flows.size() ] ) ); 088 } 089 090 /** 091 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. 092 * 093 * @param name of type String 094 * @param flows of type Collection<Flow> 095 * @return Cascade 096 */ 097 public Cascade connect( String name, Collection<Flow> flows ) 098 { 099 return connect( name, flows.toArray( new Flow[ flows.size() ] ) ); 100 } 101 102 /** 103 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name 104 * of the Cascade is derived from the given Flow instances. 105 * 106 * @param flows of type Flow 107 * @return Cascade 108 */ 109 public Cascade connect( Flow... flows ) 110 { 111 return connect( null, flows ); 112 } 113 114 /** 115 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. 116 * 117 * @param name of type String 118 * @param flows of type Flow 119 * @return Cascade 120 */ 121 public Cascade connect( String name, Flow... flows ) 122 { 123 name = name == null ? makeName( flows ) : name; 124 125 CascadeDef cascadeDef = cascadeDef() 126 .setName( name ) 127 .addFlows( flows ); 128 129 return connect( cascadeDef ); 130 } 131 132 public Cascade connect( CascadeDef cascadeDef ) 133 { 134 IdentifierGraph identifierGraph = new IdentifierGraph( cascadeDef.getFlowsArray() ); 135 FlowGraph flowGraph = new FlowGraph( identifierGraph ); 136 137 return new BaseCascade( cascadeDef, properties, flowGraph, identifierGraph ); 138 } 139 140 private String makeName( Flow[] flows ) 141 { 142 String[] names = new String[ flows.length ]; 143 144 for( int i = 0; i < flows.length; i++ ) 145 names[ i ] = flows[ i ].getName(); 146 147 return Util.join( names, "+" ); 148 } 149 }