class MyFilter implements SerializableFunction<String, Boolean>{
@Override
public Boolean apply(String input) {
return input.contains("Los Angeles");
}
}
PCollectionList<String> list=PCollectionList.of(pCustList1)
.and(pCustList2).and(pCustList3);
PCollection<String> merged=list.apply(Flatten.pCollections());
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));
//1 return PCollection as KV<String,String>
Pipeline p = Pipeline.create();
PCollection<KV<String,String>> pSideInput = p.apply(TextIO...))
.apply(ParDo.of(new DoFn<String, KV<String,String>>() {
@ProcessElement
public void process(ProcessContext c) {
Strring arr[] = c.element().split(",");
//arr[0] as key and arr[1] as value to map
c.output(KV.of(arr[0],arr[1]));
}
}));
//2 to PCollectionView
PCollectionView<Map<String, String>> pMap = pSideInput.apply(View.asMap());
//3 Use withSideInput to get a Map and join by key
PCollection<String> pMain = p.apply(TextIO...));
pMain.apply(Pardo.of(new DoFn<String,String>() {
@ProcessElement
public void process(ProcessContext c) {
Map<String, String> psideInputView = c.sideInput(pMap);
String arr[] = c.element().split(",");
String key = arr[0];
String sideInputValue = psideInputView.get(key);
c.output(c.element()+","+sideInputValue);
}
}).withSideInput(pMap));
PCollection<String> uniqueCust=pCustList.apply(Distinct.<String>create());
PCollection<Long> pLong = pInput.apply(Count.globally());
pLong.apply(ParDo.of(new DoFn<Long, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}));
//1 return PCollection as KV<String,String>
Pipeline p = Pipeline.create();
PCollection<KV<String,String>> pSideInput = p.apply(TextIO...))
.apply(ParDo.of(new DoFn<String, KV<String,String>>() {
@ProcessElement
public void process(ProcessContext c) {
Strring arr[] = c.element().split(",");
//arr[0] as key and arr[1] as value to map
c.output(KV.of(arr[0],arr[1]));
}
}));
//2 Apply groupByKey
PCollection<KV<String,Iterable<Integer>>> pIterable = pSideInput.apply(GroupByKey.<String, Integer>create());
//3 Convert KV<String,Iterable<Integer>> to output string
PCollection<String> output = pIterable.apply(ParDo.of(new DoFn<KV<String, Iterable<Integer>>,String>() {
@ProcessElement
public void process(ProcessContext c) {
String strKey=c.element().getKey();
Iterable<Integer> vals=c.element().getValue();
Integer sum=0;
for (Integer integer : vals) {sum=sum+integer;}
c.output(strKey+","+sum.toString());
}
}));
//1 input PCollection convert as KV check GroupBy example code
//2 Create TupleTag object.
final TupleTag<String> orderTuple = new TupleTag<String>();
final TupleTag<String> userTuple = new TupleTag<String>();
//3 Combine data sets using CoGroupByKey
PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple.of(orderTuple, pOrderCollection)
.and(userTuple, pUserCollection)
.apply(CoGroupByKey.<String>create());
//4 Iterate CoGbkResult and build output String
PCollection<String> output = result.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String strKey = c.element().getKey();
CoGbkResult valObject = c.element().getValue();
Iterable<String> orderTable= valObject.getAll(orderTuple);
Iterable<String> userTable = valObject.getAll(userTuple);
for (String order : orderTable) {
for (String user : userTable) {
c.output(strKey+","+order+","+user);
}
}
}
}));
for (String order : orderTable) { //orderTable as right Pcollection
if(userTable.iterator().hasNext()) {
for (String user : userTable) {
c.output(strKey+","+order+","+user);
}
} else {
c.output(strKey+","+order+","+null);
}
}
for (String user : userTable) { //orderTable as right Pcollection
if(orderTable.iterator().hasNext()) {
for (String order : orderTable) {
c.output(strKey+","+user+","+order);
}
} else {
c.output(strKey+","+user+",null,null,null");
}
}
public void processElement(ProcessContext c) {
if (!c.element().equalsIgnoreCase(HEADER)) {
String arr[] = c.element().split(",");
Row record = Row.withSchema(schema).addValues(arr[0], arr[1], arr[2], Double.valueOf(arr[3])).build();
c.output(record);
}
}
PCollection<Row> sqlInput = rowInput.apply(SqlTransform.query("select * from PCOLLECTION "));