Alink ramble: different realization of KMeans algorithm and alink design idea

Talk about Alink (1): look at Alink design ideas from the different implementation of KMeans algorithm

Catalog

0x00 summary

Alink is a new generation of machine learning algorithm platform developed by Alibaba based on Flink, a real-time computing engine. It is the first machine learning platform in the industry that supports batch algorithm and flow algorithm at the same time. This article will lead you to analyze and speculate alink's design ideas from multiple perspectives.

Because Alink's public information is too few, the following are all self speculation, and there will be omissions. I hope you can point out that I will update at any time.

0x01 what is Flink

Apache Flink is an open source stream processing framework developed by the Apache Software Foundation. It implements the Google Dataflow stream computing model to achieve a high throughput, low latency, high performance and real-time stream computing framework.

Its core is a distributed data flow engine written in Java and Scala. Flink executes arbitrary stream data programs in data parallel and pipeline mode, and Flink's pipeline runtime system can execute batch processing and stream processing programs. In addition, Flink's runtime itself supports the execution of iterative algorithms.

0x02 what is alink

Alink is a new generation of machine learning algorithm platform developed by the PAI team of Alibaba computing platform business unit based on Flink, a real-time computing engine, since 2017. It provides a rich library of algorithm components and a convenient operation framework. Developers can build the whole process of algorithm model development covering data processing, feature engineering, model training and model prediction with one click. The reason why the project is alink is taken from the public part of the relevant name (Alibaba, Algorithm, AI, Flink, Blink).

With Flink's advantages in batch flow integration, Alink can provide consistent operations for batch flow tasks. In early 2017, Alibaba team saw the advantages of Flink in batch flow integration and the excellent performance of the underlying engine through the research team, so based on Flink, it redesigned and developed the machine learning algorithm library, namely the Alink platform. The platform was launched in Alibaba Group in 2018, and then continuously improved to grow in the complex business scenarios within Alibaba.

0x03 Alink design idea

At present, there are few open materials about Alink design, so we only have the source code on hand, which seems to be the only way to push back from the code. But things in the world are not isolated. We have other perspectives to help us judge and reason. So let's extrapolate.

1. Starting from scratch

FlinkML is a set of existing machine learning algorithm library in Flink community, which has existed for a long time and has been updated slowly.

The Alink team initially faced a choice: whether to develop based on Flink ML or update Flink ML.

After research, Alink team found that Flink ML only supports more than 10 kinds of algorithms, the data structure supported is not universal enough, the optimization of algorithm performance is relatively small, and its code has not been updated for a long time. Therefore, they abandoned the idea of improving and upgrading based on the old version of FlinkML and decided to redesign and develop machine learning algorithm library based on Flink.

So what we need to analyze is how to design a new machine learning platform / framework from scratch.

2. How substitutes pose a threat

Because Alink is a new entrant to the market, the biggest problem of Alink is how to replace the existing products in the market.

Michael Porter uses "the threat of substitutes" to explain the whole substitution logic of users. When the new product can firmly grasp this point, it is possible to get a very good performance in the market and defeat competitors.

If we want to build a machine learning library or machine learning framework from 0 to 1, then we need to start from business awareness and business logic to think about the value of this product, and we can make a more accurate definition of this product, so as to determine the product route.

The product needs to solve the comprehensive problems in the application environment. The value of the product can be divided into three dimensions.

  • User's perspective: the value is reflected in the user's willingness to use and obtain products. This is the problem of replacement cost. Once the replacement cost is too high, the product is difficult to succeed.
  • From the perspective of competitors: the competitiveness of products is ultimately reflected in the maximum cost ceiling that users are willing to pay in order to obtain the products. When a substitute enters the market, there must be enough holes to drive users to replace it.
  • From the perspective of the enterprise: from the perspective of the enterprise, it is actually the problem of the scale of cost structure and revenue.

Let's analyze one by one.

3. Design from the perspective of users

This is the problem of replacement cost. Once the replacement cost is too high, the product is difficult to succeed.

Alink has two kinds of users: Algorithm Engineer and application engineer.

Alink Algorithm Engineer refers to the engineer who implements machine learning algorithm. Alink application engineer is the engineer who applies Alink AI algorithm to do business. The replacement cost of these two types of users should be considered by alink.

New products have two big problems for users: the underlying logic of products and development tools. An excellent new product can never increase the user's replacement cost on these two issues.

Underlying logic Flink

Flink is a broad and profound platform. It's not easy to be familiar with its API or understand the system architecture in depth. If alink users also need to be familiar with Flink, it will inevitably result in the replacement cost of alink users, so this should be avoided as much as possible.

  • For Algorithm Engineers, they should mainly focus on the algorithm, and try not to care about the internal details of Flink. If they must be familiar with Flink, the less, the better;

  • For application engineers, their main requirement is that the simpler the API, the better. Their ideal state is that they can't feel Flink at all.

To sum up, one of Alink's principles should be: algorithm return algorithm, Flink return Flink, try to shield the relationship between AI algorithm and Flink.

development tool

Development tools are developed in what language. Flink's development languages are mainly JAVA, SCALA and python. In the machine learning world, Python is the most important.

  • First, exclude Scala. Because Scala is a very difficult language to master, its rules are based on mathematical type theory, and the learning curve is quite steep. A good programmer who can understand the rules and language features, using Scala will be more efficient than using Java, but the productivity of an ordinary programmer, from the perspective of function implementation, will be the opposite.

    Let's take a look at the native KMeans SCALA code based on Flink. Many people may be confused after reading it.

        val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) { currentCentroids => val newCentroids = points
            .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")
            .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2")
            .groupBy(0)
            .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }.withForwardedFields("_1")
            .map { x => new Centroid(x._1, x._2.div(x._3)) }.withForwardedFields("_1->id")
          newCentroids
        }
    
  • Second, choose JAVA or Python to develop specific algorithms. There must have been a lot of expediency and choice within Alink. Because this is not only about which language is more suitable, but also about what resources are available within the Alink team, such as JAVA engineers or Python engineers. Finally Alink chose JAVA to develop the algorithm.

  • Finally, the API. There is no doubt about this. Alink provides APIs in Python and JAVA. Please refer to the introduction of GitHub.

In PyAlink, the interface provided by the algorithm component is basically the same as the Java API, that is, to create an algorithm component through the default construction method, then set parameters through setXXX, and connect with other components through link/linkTo/linkFrom. Here, Jupyter's automatic completion mechanism can provide writing convenience.

In addition, if JAVA or Python is used, there must be a lot of existing code that can be modified and reused. If SCALA is adopted, it is difficult to reuse the previous accumulation.

To sum up, one of Alink's principles should be to adopt the simplest and most common development language and design thinking.

4. Design from the perspective of competitors

Alink's competitors may be regarded as spark ml, Flink ml, scikit learn.

They are the existing power in the market, with a large number of users. Users are familiar with the design ideas, development strategies, basic concepts and APIs of these competitors. Unless Alink can provide a magic and simple API, Alink should learn from these competitors to the greatest extent in design.

For example, there are the following common concepts in machine learning development: Transformer, Estimator, PipeLine, Parameter. Alink should try to provide these concepts.

To sum up, one of the principles of * * Alink should be: learn from the common design ideas and development models on the market as much as possible, so that developers can switch * * seamlessly.

From Alink's directory structure, we can see that Alink does provide these common concepts.

For example, Pipeline, Trainer, Model, Estimator. We will introduce these concepts in more detail in the following articles.

./java/com/alibaba/alink:
common		operator	params		pipeline
  
./java/com/alibaba/alink/params:
associationrule	evaluation	nlp		regression	statistics
classification	feature		onlinelearning	shared		tuning
clustering	io		outlier		similarity	udf
dataproc	mapper		recommendation	sql		validators

./java/com/alibaba/alink/pipeline:
EstimatorBase.java	ModelBase.java		Trainer.java		feature
LocalPredictable.java	ModelExporterUtils.java	TransformerBase.java	nlp
LocalPredictor.java	Pipeline.java		classification		recommendation
MapModel.java		PipelineModel.java	clustering		regression
MapTransformer.java	PipelineStageBase.java	dataproc		tuning

5. Design from the perspective of enterprises

This is the scale of cost structure and revenue. So Alink must try to improve the efficiency and productivity of development engineers. Part of the reason why we abandoned SCALA is because of this.

The challenges focus on:

  • How to make full use of Flink's capabilities even though Flink is blocked to the greatest extent.
  • How to build a set of corresponding tactics and tactics system, i.e. middleware or adapter, so that users can quickly develop algorithms based on this

for instance:

  • There must be some developers who are very familiar with Flink. They can use all kinds of Flink API s and functional programming thinking to develop efficient algorithms. This kind of developer can be called Wudu head of Wusong. They are similar to the special forces. They can go to the battlefield to fight, and they can also hang and hit the big white headed insects.

  • But most developers are not familiar with Flink, they are more familiar with AI algorithm and command-based programming ideas. We can think that these developers belong to the 800000 forbidden army or Xuanjia army, Beifu army, weiwuzu army and Beiwei army. This is the main force and routine in the actual development.

We need to design a set of gun and stick fighting methods suitable for regular operations for the 800000 forbidden army. Or for the back Wei army, let Marshal Yue Fei design a set of horse army charge mechanism.

Therefore, one of the principles of * * Alink should be to build a set of tactics (middleware or adapter), that is, to shield Flink, to make good use of Flink, and to allow users to develop algorithms quickly * * based on this.

Let's think about what basic work needs to be done:

  • How to initialize
  • If communication
  • How to split code, how to broadcast code
  • How to broadcast data if it is divided
  • How to iterate algorithm
  • ......

Let's see what efforts Alink has made. From its directory structure, we can see that there are queue, operator, mapper and other data structures necessary for building the architecture:

./java/com/alibaba/alink/common:
MLEnvironment.java		linalg MLEnvironmentFactory.java	mapper
VectorTypes.java		model comqueue			utils io

./java/com/alibaba/alink/operator:
AlgoOperator.java 	common  batch	 stream

The most important concept is BaseComQueue, which abstracts communication or computation into ComQueueItem, and then concatenates ComQueueItem to form a queue. In this way, an iterative communication computing framework for iterative computing scenarios is formed. Most of the other data structures operate around BaseComQueue.

/**
 * Base class for the com(Computation && Communicate) queue.
 */
public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {
	/**
	 * All computation or communication functions.
	 */
	private final List<ComQueueItem> queue = new ArrayList<>();
	/**
	 * sessionId for shared objects within this BaseComQueue.
	 */
	private final int sessionId = SessionSharedObjs.getNewSessionId();
	/**
	 * The function executed to decide whether to break the loop.
	 */
	private CompareCriterionFunction compareCriterion;
	/**
	 * The function executed when closing the iteration
	 */
	private CompleteResultFunction completeResult;
	/**
	 * Max iteration count.
	 */
	private int maxIter = Integer.MAX_VALUE;

	private transient ExecutionEnvironment executionEnvironment;
}

MLEnvironment is another important class. It encapsulates the running context necessary for Flink development. Through this class, users can obtain various actual running environments, establish table s, and run SQL statements.

/**
 * The MLEnvironment stores the necessary context in Flink.
 * Each MLEnvironment will be associated with a unique ID.
 * The operations associated with the same MLEnvironment ID
 * will share the same Flink job context.
 */
public class MLEnvironment {
    private ExecutionEnvironment env;
    private StreamExecutionEnvironment streamEnv;
    private BatchTableEnvironment batchTableEnv;
    private StreamTableEnvironment streamTableEnv;
}

6. Summary of design principles

Now we can summarize some design principles of Alink

  • The return algorithm of the algorithm, the return of Flink, try to shield the relationship between AI algorithm and Flink.

  • Adopt the simplest and most common development language.

  • Try to learn from the common design ideas and development mode on the market, so that developers can switch seamlessly.

  • Build a set of tactics (middleware or adapter), which not only shields Flink, but also makes good use of Flink, and enables users to develop algorithms quickly based on this.

0x04 KMeans algorithm implementation and design

In the source code of Flink and Alink, KMeans algorithm examples are provided, so let's start with KMeans to see the difference between the implementation of Flink native algorithm and Alink algorithm. In order to unify the standard, we all choose JAVA version algorithm.

1. KMeans algorithm

The idea of KMeans algorithm is relatively simple. Suppose we want to divide the data into K classes, it can be roughly divided into the following steps:

  • Randomly select k points as clustering center;
  • Each point is divided into k clustering centers, and then the point is divided into the nearest clustering centers, so that K clusters are formed;
  • Then the center of mass (mean) of each cluster is recalculated;
  • Repeat the above 2-4 steps until the position of the center of mass no longer changes or reaches the set number of iterations.

2. Flink KMeans example

K-Means is an iterative clustering algorithm, which initially sets K clustering centers

  1. In each iteration, the algorithm calculates the Euclidean distance from each data point to each cluster center
  2. Each point is assigned to its nearest cluster center
  3. Each cluster center is then moved to all assigned points
  4. The moving cluster center is assigned to the next iteration
  5. The algorithm terminates after a fixed number of iterations (in this implementation, parameter setting)
  6. Or the cluster center is not moving in the iteration
  7. This project is to work on data points of two-dimensional plane
  8. It calculates the data points assigned to the cluster center
  9. Each data point is annotated with the id of the final cluster (Center) to which it belongs.

Some codes are given below. The specific algorithm explanation can be seen in the comments.

The bulk iteration of Flink is mainly used here. It calls the iterate(int) method of DataSet to create a BulkIteration, which is used as the starting point for iteration to return an iterative DataSet, which can be converted with normal operators. The parameter int of the iteration call specifies the maximum number of iterations.

The iterative DataSet calls the closeWith(DataSet) method to specify which transformation should be fed back to the next iteration. You can choose to use the closeWith(DataSet) to specify the termination condition. If the DataSet is empty, it evaluates the second DataSet and terminates the iteration. If no termination condition is specified, the iteration terminates after a given maximum number of iterations.

public class KMeans {

	public static void main(String[] args) throws Exception {

		// Checking input parameters
		final ParameterTool params = ParameterTool.fromArgs(args);

		// set up execution environment
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.getConfig().setGlobalJobParameters(params); // make parameters available in the web interface

		// get input data:
		// read the points and centroids from the provided paths or fall back to default data
		DataSet<Point> points = getPointDataSet(params, env);
		DataSet<Centroid> centroids = getCentroidDataSet(params, env);

		// set number of bulk iterations for KMeans algorithm
		IterativeDataSet<Centroid> loop = centroids.iterate(params.getInt("iterations", 10));

		DataSet<Centroid> newCentroids = points
			// compute closest centroid for each point
			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
			// count and sum point coordinates for each centroid
			.map(new CountAppender())
			.groupBy(0).reduce(new CentroidAccumulator())
			// compute new centroids from point counts and coordinate sums
			.map(new CentroidAverager());

		// feed new centroids back into next iteration
		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
			// assign points to final clusters
			.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");

		// emit result
		if (params.has("output")) {
			clusteredPoints.writeAsCsv(params.get("output"), "\n", " ");
			// since file sinks are lazy, we trigger the execution explicitly
			env.execute("KMeans Example");
		} else {
			System.out.println("Printing result to stdout. Use --output to specify output path.");
			clusteredPoints.print();
		}
	}

3. Alink KMeans example

In Alink, Kmeans are distributed in several files. Here, we extract part of the code for comparison.

KMeansTrainBatchOp

This is the main program of algorithm. It looks very fresh and clean, but it is not so simple. Alink has done a lot of basic work behind it.

It can be seen that the main work of algorithm implementation is:

  • An iterative comqueue (the default implementation of BaseComQueue) is built.
  • There are two ways to initialize the data: initwithpartitionedata to cache the DataSet partition into memory. initWithBroadcastData caches the DataSet as a whole to each worker's memory.
  • Divide the calculation into several compute functions, such as kmeans preallocatecentroid / kmeanssassigncluster / kmeansupdatecentroids..., which are concatenated in the iterative comqueue.
  • The data synchronization is completed by using the AllReduce communication model.
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
	implements KMeansTrainParams <KMeansTrainBatchOp> {

	static DataSet <Row> iterateICQ(...ellipsis...) {

		return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
			.setMaxIter(maxIter)
			.exec();
	}
}  

KMeansPreallocateCentroid

Pre assigned cluster center

public class KMeansPreallocateCentroid extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() == 1) {
            List<FastDistanceMatrixData> initCentroids = (List)context.getObj("initCentroid");
            List<Integer> list = (List)context.getObj("statistics");
            Integer vectorSize = (Integer)list.get(0);
            context.putObj("vectorSize", vectorSize);
            FastDistanceMatrixData centroid = (FastDistanceMatrixData)initCentroids.get(0);
            Preconditions.checkArgument(centroid.getVectors().numRows() == vectorSize, "Init centroid error, size not equal!");
            context.putObj("centroid1", Tuple2.of(context.getStepNo() - 1, centroid));
            context.putObj("centroid2", Tuple2.of(context.getStepNo() - 1, new FastDistanceMatrixData(centroid)));
            context.putObj("k", centroid.getVectors().numCols());
        }
    }
}

KMeansAssignCluster

Calculate the nearest cluster center for each point, count and sum the coordinates of each cluster center

/**
 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
 */
public class KMeansAssignCluster extends ComputeFunction {
    @Override
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        // get iterative coefficient from static memory.
        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }

        if (null == distanceMatrix) {
            distanceMatrix = new DenseMatrix(k, 1);
        }

        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
        if (sumMatrixData == null) {
            sumMatrixData = new double[k * (vectorSize + 1)];
            context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
        }

        Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA);
        if (trainData == null) {
            return;
        }

        Arrays.fill(sumMatrixData, 0.0);
        for (FastDistanceVectorData sample : trainData) {
            KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance,
                distanceMatrix);
        }
    }
}

KMeansUpdateCentroids

Based on the point count and coordinates, the new clustering center is calculated.

/**
 * Update the centroids based on the sum of points and point number belonging to the same cluster.
 */
public class KMeansUpdateCentroids extends ComputeFunction {
    @Override
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);

        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }

        stepNumCentroids.f0 = context.getStepNo();
        context.putObj(KMeansTrainBatchOp.K,
            updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance));
    }
}

4. Differences

Code quantity

From the following analysis, it can be seen that, in terms of actual business code volume, there is little difference.

  • Flink has a small amount of code;
  • Although Alink has a large amount of code, its essence is to separate some user-defined classes of Flink version into their own different classes, and there are many codes reading environment variables;

So Alink code can only be said to be slightly larger than Flink's native implementation.

Coupling degree

This refers to the coupling with Flink. It can be seen that Flink's KMeans algorithm requires a large number of Flink classes. Alink is blocked to the maximum extent.

  • flink algorithm needs to introduce the following flink classes
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
  • The flink classes that ALink algorithm needs to introduce are as follows. It can be seen that ALink uses basic facilities and does not involve operators and complex API s, which reduces the burden of users.
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Programming mode

This is a major difference.

  • Flink uses functional programming. This paradigm is relatively new and unfamiliar to many engineers.
  • Alink still uses imperative programming. The advantage of this is that a large number of existing algorithm codes can be reused, which is more in line with the habits of most engineers.
  • Flink completes operations through various Flink operators, such as iterative dataset. But this implementation is a torment for engineers unfamiliar with Flink.
  • Alink summarizes the calculation code into several computefunctions based on its own framework, and then completes the iteration of specific algorithm through iterative comqueue. In this way, users don't need to have a deep understanding of Flink.

In the next issue, we will analyze and verify the design idea from the source point of view.

0x05 reference

The definition of business model -- what is a product

Analysis of the principle of k-means clustering algorithm

Implementation of flink kmeans clustering algorithm

Pipeline, DataFrame, Estimator, Transformer of Spark ML introduction

Open source | the world's first batch flow integrated machine learning platform

Win GitHub 2000+ Star, how does alicloud's open-source Alink machine learning platform win the double 11 data "game"? AI technology ecology

Flink DataSet API

Tags: Java Apache Scala Programming

Posted on Wed, 06 May 2020 01:33:44 -0700 by geo115fr