001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation.assertion; 022 023import java.beans.ConstructorProperties; 024import java.util.regex.Matcher; 025import java.util.regex.Pattern; 026 027import cascading.flow.FlowProcess; 028import cascading.management.annotation.Property; 029import cascading.management.annotation.PropertyDescription; 030import cascading.management.annotation.Visibility; 031import cascading.operation.GroupAssertion; 032import cascading.operation.GroupAssertionCall; 033import cascading.operation.OperationCall; 034import cascading.tuple.Tuple; 035import cascading.tuple.TupleEntry; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * 041 */ 042public abstract class AssertGroupBase extends BaseAssertion<AssertGroupBase.Context> implements GroupAssertion<AssertGroupBase.Context> 043 { 044 /** Field LOG */ 045 private static final Logger LOG = LoggerFactory.getLogger( AssertGroupBase.class ); 046 047 /** Field patternString */ 048 protected String patternString; 049 050 /** Field size */ 051 protected final long size; 052 053 public static class Context 054 { 055 Pattern pattern; 056 Long count; 057 String fields; 058 String group; 059 060 public Context set( long count, String fields, String group ) 061 { 062 this.count = count; 063 this.fields = fields; 064 this.group = group; 065 066 return this; 067 } 068 069 public Context reset() 070 { 071 count = null; 072 fields = null; 073 group = null; 074 075 return this; 076 } 077 } 078 079 @ConstructorProperties({"message", "size"}) 080 public AssertGroupBase( String message, long size ) 081 { 082 super( message ); 083 this.size = size; 084 } 085 086 @ConstructorProperties({"message", "patternString", "size"}) 087 protected AssertGroupBase( String message, String patternString, long size ) 088 { 089 super( message ); 090 this.patternString = patternString; 091 this.size = size; 092 } 093 094 @Property(name = "patternString", visibility = Visibility.PRIVATE) 095 @PropertyDescription("The regular expression pattern string.") 096 public String getPatternString() 097 { 098 return patternString; 099 } 100 101 public long getSize() 102 { 103 return size; 104 } 105 106 private Pattern getPattern() 107 { 108 Pattern pattern; 109 110 if( patternString == null ) 111 pattern = Pattern.compile( ".*" ); 112 else 113 pattern = Pattern.compile( patternString ); 114 115 return pattern; 116 } 117 118 private boolean matchWholeTuple( Tuple input, Pattern pattern ) 119 { 120 if( patternString == null ) 121 return true; 122 123 Matcher matcher = pattern.matcher( input.toString( "\t", false ) ); 124 125 LOG.debug( "pattern: {}, matches: {}", pattern, matcher.matches() ); 126 127 return matcher.matches(); 128 } 129 130 @Override 131 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 132 { 133 operationCall.setContext( new Context() ); 134 operationCall.getContext().pattern = getPattern(); 135 } 136 137 @Override 138 public void start( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall ) 139 { 140 TupleEntry groupEntry = assertionCall.getGroup(); 141 Context context = assertionCall.getContext(); 142 143 // didn't match, so skip 144 if( !matchWholeTuple( groupEntry.getTuple(), context.pattern ) ) 145 context.reset(); 146 else 147 context.set( 0L, groupEntry.getFields().print(), groupEntry.getTuple().print() ); 148 } 149 150 @Override 151 public void aggregate( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall ) 152 { 153 Long groupSize = assertionCall.getContext().count; 154 155 // didn't match, so skip 156 if( groupSize != null ) 157 assertionCall.getContext().count += 1L; 158 } 159 160 @Override 161 public void doAssert( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall ) 162 { 163 Context context = assertionCall.getContext(); 164 Long groupSize = context.count; 165 166 if( groupSize == null ) // didn't match, so skip 167 return; 168 169 if( assertFails( groupSize ) ) 170 { 171 if( patternString == null ) 172 fail( groupSize, size, context.fields, context.group ); 173 else 174 fail( patternString, groupSize, size, context.fields, context.group ); 175 } 176 } 177 178 protected abstract boolean assertFails( Long groupSize ); 179 180 @Override 181 public boolean equals( Object object ) 182 { 183 if( this == object ) 184 return true; 185 if( !( object instanceof AssertGroupBase ) ) 186 return false; 187 if( !super.equals( object ) ) 188 return false; 189 190 AssertGroupBase that = (AssertGroupBase) object; 191 192 if( size != that.size ) 193 return false; 194 if( patternString != null ? !patternString.equals( that.patternString ) : that.patternString != null ) 195 return false; 196 197 return true; 198 } 199 200 @Override 201 public int hashCode() 202 { 203 int result = super.hashCode(); 204 result = 31 * result + ( patternString != null ? patternString.hashCode() : 0 ); 205 result = 31 * result + (int) ( size ^ size >>> 32 ); 206 return result; 207 } 208 }