Tuesday, May 1, 2012

Sample Code in Pig: Impressions & Clicks

REGISTER  /home/dm53/herbie/pricing/pig-jars/elephant-bird-2.0.5.jar;
REGISTER /home/dm53/herbie/pricing/pig-jars/google-collect-1.0.jar;
REGISTER /home/dm53/herbie/pricing/pig-jars/json-simple-1.1.jar;
REGISTER /home/dm53/herbie/pricing/pig-jars/piggybank.jar;
REGISTER /home/dm53/herbie/pricing/pig-jars/piggybank-0.3-amzn.jar;
REGISTER /home/dm53/herbie/pricing/pig-jars/JsonStringToBagOfMaps.jar;

DEFINE JSON2MAP com.twitter.elephantbird.pig.piggybank.JsonStringToMap();
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME();
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT();

DEFINE JSON_TO_BAG_OF_MAPS dm.pigudfs.JsonStringToBagOfMaps();



--version 1

rawimps = LOAD '2012-04-20-20' USING TextLoader AS line:chararray;
imp = FOREACH rawimps GENERATE JSON2MAP(line);

impression = FOREACH imp GENERATE
(chararray) $0#'impression_id' AS impression_id,
JSON2MAP($0#'publisher_descriptor') AS publisher,
FLATTEN(JSON_TO_BAG_OF_MAPS($0#'served_rads')) AS rad,
(long) FORMAT_DT('YYYYMMddHH', DATE_TIME((long) $0#'logTimestampMs')) AS hour;

impression = FOREACH impression GENERATE  impression_id,  publisher#'site_id' as site_id, rad#'rad_url' as rad_url, hour;

filter_imp = FILTER impression by site_id == '4' AND rad_url matches 'http://www.ehow.com/.*';
grp_imp = GROUP filter_imp BY (impression_id, hour);
imp_count = FOREACH grp_imp GENERATE $0 as impression_id_hour, COUNT($1) as rad_count;



--generate click list
rawclicks =  LOAD '2012-04-20-20.click' USING TextLoader AS line:chararray;
click = FOREACH rawclicks GENERATE JSON2MAP(line);

click = FOREACH click GENERATE
--'click' AS type,
(chararray) $0#'impression_id' AS impression_id,
(long) FORMAT_DT('YYYYMMddHH', DATE_TIME((long) $0#'logTimestampMs')) AS hour;

grp_click = GROUP click BY (impression_id, hour);
click_count = FOREACH grp_click GENERATE $0 as impression_id_hour, COUNT($1) as rad_count;

--join the impression and clicks by impression_id and hour
jnd = join filter_imp_count by (impression_id_hour.$0, impression_id_hour.$1) left outer, click_count by (impression_id_hour.$0, impression_id_hour.$1);

grp_jnd = GROUP jnd BY filter_imp_count::impression_id_hour.$1;
result = FOREACH grp_jnd GENERATE $0, SUM($1.$1) as imp_count, SUM($1.$3) as click_count;

No comments:

Post a Comment