Page single hop conversion module

1, Demand analysis

1. Get the taskid from the spark submit script submitted by the user and get the parameters of the task
2. Obtain the data within the specified date range for calculation, and obtain the page slice flow to calculate the conversion ratio of visits between pages.
Like targe tPageFlow:1 , 2,3,4 we are going to calculate the access conversion rate of 1 - > 2,2 - > 3,3 - > 4 pages

2, Data format

Data is user access information
date String timestamp
user_id Long user id
session_id String current access session id
page_id Long page id visited
action_time String time point of this visit
search_keyword String search keywords
click_category_id Long click type
click_product_id Long the id of the clicked item
order_category_ids String order category id
order_product_ids String order item id
pay_category_ids String payment category id
pay_product_ids String payment item id
city_id Long City id

We only need date and session here_ id,page_id field

3, Technical scheme design and Implementation

1. First, get the task parameters from the task table of MySQL, and get the time range and target slice of the information we need to process
2. According to the time range, the user access data is obtained, and then the data is converted to < sessionid by map operator, actioinfo >
3. Group bykey is used to aggregate the data of the same session. To calculate the single hop conversion, it must be a continuous access in a session
4. To write a page slice matching algorithm, the general idea is to splice the pageid in the task parameter, such as 1_ 2,2_ 3,3_ 4. For the data in each session, sort by time (user's access order), splice the pageids together, and match them in cycles. After matching, add them to the returned list. In fact, I think it's better to include lastpageid in session data. After all, the data sorted by time is also inaccurate and convenient.
Perform the countbykey operator on the generated RDD to calculate the total number of accesses for each slice

private static JavaPairRDD<String, Integer> generateAndMatchPageSplit(JavaSparkContext sc,
                                                                          JavaPairRDD<String, Iterable<Row>> sessionid2actionsRDD,
                                                                          JSONObject taskParam) {
        //Get target in param_ page_ Flow field
        String param = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW);
        //Broadcast fields to each executor to avoid a large number of network transmission
        Broadcast<String> taskParamBoradcast = sc.broadcast(param);
        return sessionid2actionsRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Row>>, String, Integer>() {
            public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Iterable<Row>> tuple) throws Exception {
                List<Tuple2<String,Integer>> list = new ArrayList<>();
                Iterator<Row> iterator = tuple._2.iterator();
                //Get the data of broadcast variables through value(), and save taskparam as an array
                String[] splitParam = taskParamBoradcast.value().split(",");
                List<Row> row = new ArrayList<>();
                //Add the information of this session to a list
                while (iterator.hasNext()){
                //Sort by time dimension
                Collections.sort(row, new Comparator<Row>() {
                    public int compare(Row o1, Row o2) {
                        String date1 = o1.getString(4);
                        String date2 = o2.getString(4);
                        Date actionTime1 = DateUtils.parseTime(date1);
                        Date actionTime2 = DateUtils.parseTime(date2);
                        return (int)(actionTime1.getTime() - actionTime2.getTime());
            Long lastPageId = null;
            //Loop through session data in chronological order
            for (Row row1 : row){
                Long pageId = row1.getLong(3);
                if(lastPageId == null){
                    lastPageId = pageId;
                String pageSplit = lastPageId + "_" + pageId;
                //Loop through the data in taskparam
                for (int i = 1;i < splitParam.length;i++){
                    String targetPageSplit = splitParam[i-1] + "_" + splitParam[i];
                    if (pageSplit.equals(targetPageSplit)){
                        list.add(new Tuple2<>(pageSplit,1));
            lastPageId = pageId;
            return list;

5. Next, we need to get the number of visits to the homepage of the page slice stream, because 1_ 2 is to jump from the page with pageid 1 to the number of pages with pageid 2, but we don't know the number of pages to visit 1. We need to obtain the number of pages with pageid 1 in the time range

private static Long getStartPagePv(JSONObject taskParam, JavaPairRDD<String, Row> sessionid2action) {
        String param = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW);
        //Get the first bit in the page slice stream
        final Long startPage = Long.valueOf(param.split(",")[0]);
        JavaPairRDD<String, Row> filter = sessionid2action.filter(new Function<Tuple2<String, Row>, Boolean>() {
            public Boolean call(Tuple2<String, Row> v1) throws Exception {
                Row row = v1._2;
                Long pageId = row.getLong(3);
                return startPage.equals(pageId) ;
    return filter.count();

6. The conversion rate of each slice is calculated and stored in mysql database.

Tags: Session MySQL Spark network

Posted on Fri, 05 Jun 2020 00:03:52 -0700 by defunct