001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.assertion; 022 023 import java.beans.ConstructorProperties; 024 import java.util.regex.Matcher; 025 import java.util.regex.Pattern; 026 027 import cascading.flow.FlowProcess; 028 import cascading.operation.GroupAssertion; 029 import cascading.operation.GroupAssertionCall; 030 import cascading.operation.OperationCall; 031 import cascading.tuple.Tuple; 032 import cascading.tuple.TupleEntry; 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 /** 037 * 038 */ 039 public abstract class AssertGroupBase extends BaseAssertion<AssertGroupBase.Context> implements GroupAssertion<AssertGroupBase.Context> 040 { 041 /** Field LOG */ 042 private static final Logger LOG = LoggerFactory.getLogger( AssertGroupBase.class ); 043 044 /** Field patternString */ 045 protected String patternString; 046 047 /** Field size */ 048 protected final long size; 049 050 public static class Context 051 { 052 Pattern pattern; 053 Long count; 054 String fields; 055 String group; 056 057 public Context set( long count, String fields, String group ) 058 { 059 this.count = count; 060 this.fields = fields; 061 this.group = group; 062 063 return this; 064 } 065 066 public Context reset() 067 { 068 count = null; 069 fields = null; 070 group = null; 071 072 return this; 073 } 074 } 075 076 @ConstructorProperties({"message", "size"}) 077 public AssertGroupBase( String message, long size ) 078 { 079 super( message ); 080 this.size = size; 081 } 082 083 @ConstructorProperties({"message", "patternString", "size"}) 084 protected AssertGroupBase( String message, String patternString, long size ) 085 { 086 super( message ); 087 this.patternString = patternString; 088 this.size = size; 089 } 090 091 public String getPatternString() 092 { 093 return patternString; 094 } 095 096 public long getSize() 097 { 098 return size; 099 } 100 101 private Pattern getPattern() 102 { 103 Pattern pattern; 104 105 if( patternString == null ) 106 pattern = Pattern.compile( ".*" ); 107 else 108 pattern = Pattern.compile( patternString ); 109 110 return pattern; 111 } 112 113 private boolean matchWholeTuple( Tuple input, Pattern pattern ) 114 { 115 if( patternString == null ) 116 return true; 117 118 Matcher matcher = pattern.matcher( input.toString( "\t", false ) ); 119 120 LOG.debug( "pattern: {}, matches: {}", pattern, matcher.matches() ); 121 122 return matcher.matches(); 123 } 124 125 @Override 126 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 127 { 128 operationCall.setContext( new Context() ); 129 operationCall.getContext().pattern = getPattern(); 130 } 131 132 @Override 133 public void start( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall ) 134 { 135 TupleEntry groupEntry = assertionCall.getGroup(); 136 Context context = assertionCall.getContext(); 137 138 // didn't match, so skip 139 if( !matchWholeTuple( groupEntry.getTuple(), context.pattern ) ) 140 context.reset(); 141 else 142 context.set( 0L, groupEntry.getFields().print(), groupEntry.getTuple().print() ); 143 } 144 145 @Override 146 public void aggregate( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall ) 147 { 148 Long groupSize = assertionCall.getContext().count; 149 150 // didn't match, so skip 151 if( groupSize != null ) 152 assertionCall.getContext().count += 1L; 153 } 154 155 @Override 156 public void doAssert( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall ) 157 { 158 Context context = assertionCall.getContext(); 159 Long groupSize = context.count; 160 161 if( groupSize == null ) // didn't match, so skip 162 return; 163 164 if( assertFails( groupSize ) ) 165 { 166 if( patternString == null ) 167 fail( groupSize, size, context.fields, context.group ); 168 else 169 fail( patternString, groupSize, size, context.fields, context.group ); 170 } 171 } 172 173 protected abstract boolean assertFails( Long groupSize ); 174 175 @Override 176 public boolean equals( Object object ) 177 { 178 if( this == object ) 179 return true; 180 if( !( object instanceof AssertGroupBase ) ) 181 return false; 182 if( !super.equals( object ) ) 183 return false; 184 185 AssertGroupBase that = (AssertGroupBase) object; 186 187 if( size != that.size ) 188 return false; 189 if( patternString != null ? !patternString.equals( that.patternString ) : that.patternString != null ) 190 return false; 191 192 return true; 193 } 194 195 @Override 196 public int hashCode() 197 { 198 int result = super.hashCode(); 199 result = 31 * result + ( patternString != null ? patternString.hashCode() : 0 ); 200 result = 31 * result + (int) ( size ^ size >>> 32 ); 201 return result; 202 } 203 }