search- elasticsearch scala terminal: elastic4s

The previous article said that elastic search itself is a complete background system, and its operation is carried out through terminal api. elasticsearch itself provides APIs for a variety of programming languages, including java's esjava. elastic4s is a scala api based on esjava.

First, let's look at the construction process of ElasticClient of scala terminal:

  import com.sksamuel.elastic4s.ElasticDsl._
  val esjava =  JavaClient(ElasticProperties("http://localhost:9200"))
  val client = ElasticClient(esjava)

First, build the JavaClient. The JavaClient package is embedded with an esjava RestClient for specific operations:

class JavaClient(client: RestClient) extends HttpClient {
...
//send request to elasticsearch
override def send(req: ElasticRequest, callback: Either[Throwable, HttpResponse] => Unit): Unit = {
    if (logger.isDebugEnabled) {
      logger.debug("Executing elastic request {}", Show[ElasticRequest].show(req))
    }

    val l = new ResponseListener {
      override def onSuccess(r: org.elasticsearch.client.Response): Unit = callback(Right(fromResponse(r)))
      override def onFailure(e: Exception): Unit = e match {
        case re: ResponseException => callback(Right(fromResponse(re.getResponse)))
        case t => callback(Left(JavaClientExceptionWrapper(t)))
      }
    }

    val request = new Request(req.method, req.endpoint)
    req.params.foreach { case (key, value) => request.addParameter(key, value) }
    req.entity.map(apacheEntity).foreach(request.setEntity)
//perform actual request sending
    client.performRequestAsync(request, l)
  }
...
}

The RestClient above is the javaClient provided by elastic search. The specific operation of elastic4s is performed through RestClient.performrequesasync, as follows:

 

public class RestClient implements Closeable {
...
    /**
     * Sends a request to the Elasticsearch cluster that the client points to.
     * The request is executed asynchronously and the provided
     * {@link ResponseListener} gets notified upon request completion or
     * failure. Selects a host out of the provided ones in a round-robin
     * fashion. Failing hosts are marked dead and retried after a certain
     * amount of time (minimum 1 minute, maximum 30 minutes), depending on how
     * many times they previously failed (the more failures, the later they
     * will be retried). In case of failures all of the alive nodes (or dead
     * nodes that deserve a retry) are retried until one responds or none of
     * them does, in which case an {@link IOException} will be thrown.
     *
     * @param request the request to perform
     * @param responseListener the {@link ResponseListener} to notify when the
     *      request is completed or fails
     */
    public void performRequestAsync(Request request, ResponseListener responseListener) {
        try {
            FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
            InternalRequest internalRequest = new InternalRequest(request);
            performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
        } catch (Exception e) {
            responseListener.onFailure(e);
        }
    }
...

}

In addition, ElasticProperties is a parameter structure of the connection between javaClient and ES, including IP address:

/**
  * Contains the endpoints of the nodes to connect to, as well as connection properties.
  */
case class ElasticProperties(endpoints: Seq[ElasticNodeEndpoint], options: Map[String, String] = Map.empty)

ElasticProperties contains ES address ElasticNodeEndPoint and other connection parameters (if necessary), as follows:

 it should "support prefix path with trailing slash" in {
    ElasticProperties("https://host1:1234,host2:2345/prefix/path/") shouldBe
      ElasticProperties(Seq(ElasticNodeEndpoint("https", "host1", 1234, Some("/prefix/path")), ElasticNodeEndpoint("https", "host2", 2345, Some("/prefix/path"))))
  }

After elastic4s completes the connection with elasticsearch, it can send the Json instruction combined according to ES requirements to the background ES for execution. Elastic4s provides a set of DSL, an embedded language, which can help users to more easily use programming mode to combine ES instruction Json. Of course, users can also directly send the character class Json to the background ES through elastic client. Here is a simple demonstration of elastic4s that can be run:

import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}

object HttpClientExampleApp extends App {

  // you must import the DSL to use the syntax helpers
  import com.sksamuel.elastic4s.ElasticDsl._
  val esjava =  JavaClient(ElasticProperties("http://localhost:9200"))
  val client = ElasticClient(esjava)


    client.execute {
      bulk(
        indexInto("books" ).fields("title" -> "Ten ways to eat Chongqing hot pot", "content" -> "In this book, we describe all kinds of cooking methods of hot pot"),
        indexInto("books" ).fields("title" -> "Chinese hot pot Daquan", "content" -> "This book is an introduction to hot pot in Chinese cuisine")
      ).refresh(RefreshPolicy.WaitFor)
    }.await

  val json =
    """
      |{
      |  "query" : {
      |    "match" : {"title" : "Hot Pot"}
      |  }
      |}
      |""".stripMargin
  val response = client.execute {
    search("books").source(json)   //      .matchQuery("title", "Hot Pot")
  }.await

  // prints out the original json
  println(response.result.hits.hits.head.sourceAsString)

  client.close()

}

Tags: Scala JSON ElasticSearch Programming

Posted on Sat, 21 Mar 2020 07:37:19 -0700 by maskme