Pipeline Explained
Easegress offers a rich set of filters tailored for diverse use cases, including validation, request building, and more. At its core, the Pipeline serves as an integrated framework within Easegress, designed specifically to coordinate and sequence filters for processing requests.
- It supports calling multiple backend services for one input request.
 - It supports conditional forward direction jumping.
 - The 
HTTPServerreceives incoming traffic and routes to one dedicatedPipelinein Easegress according to the HTTP header, path matching, etc. 
Details
Sequences executing
- The basic model of Pipeline execution is a sequence. Filters are executed one by one in the order described by the 
flowfield in Pipeline’s spec. 
        Request
           │
┌──────────┼──────────┐
│ Pipeline │          │
│          │          │
│  ┌───────┴───────┐  │
│  │requestAdaptor │  │
│  └───────┬───────┘  │
│          │          │
│          │          │
│  ┌───────▼───────┐  │
│  │     Proxy     │  │
│  └───────┬───────┘  │
│          │          │
└──────────┼──────────┘
           │
           ▼
        Response
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: requestAdaptor
- filter: proxy
filters:
- name: requestAdaptor
  kind: RequestAdaptor
  header:
    set:
      X-Adapt-Key: goodplan
- name: proxy
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
' | egctl create -f -
- The 
pipeline-demoabove will execute therequestAdaptorfilter first, thenproxy. So theproxyfilter can forward the request with the headerX-Adapt-Keywhich was set by therequestAdaptor. 
JumpIf
- Easegress’ filter returns a string message as the execution result. If it’s empty, that means these filters handle the request without failure. Otherwise, Easegress will use these no-empty result strings as the basis for the 
JumpIfmechanism. - The Pipeline supports 
JumpIfmechanism[2]. We use it to avoid some chosen filters’ execution when something goes wrong. 
            Request
               │
               │
┌──────────────┼─────────────────┐
│ Pipeline     │                 │
│              ▼                 │
│     ┌──────────────────┐       │
│     │    Validator     ├────┐  │
│     └────────┬─────────┘    │  │
│              │              │  │
│              ▼              │  │
│     ┌──────────────────┐    │  │
│     │  RequestAdaptor  │    │  │
│     └────────┬─────────┘    │  │
│              │          Invalid│
│              ▼              │  │
│     ┌──────────────────┐    │  │
│     │     Proxy        │    │  │
│     └────────┬─────────┘    │  │
│              │              │  │
│              ◄──────────────┘  │
└──────────────┼─────────────────┘
               │
               ▼
            Response
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: validator
  jumpIf: { invalid: END }
- filter: requestAdaptor
- filter: proxy
filters:
- name: validator
  kind: Validator
  headers:
    Content-Type:
      values:
      - application/json
- name: requestAdaptor
  kind: RequestAdaptor
  header:
    set:
      X-Adapt-Key: goodplan
- name: proxy
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
' | egctl apply -f -
- As we can see above, 
pipeline-demowill jump to the end of pipeline execution whenvalidator’s execution result isinvalid. 
Built-in Filter END
ENDis a built-in filter, that’s we can use without defining it. From the above example, we already see thatENDcan be used as the target ofjumpIf, and we can also use it directly in the flow.
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: validator
  jumpIf: { invalid: buildFailureResponse }
- filter: requestAdaptor
- filter: proxy
- filter: END
- filter: buildFailureResponse
filters:
- name: validator
  kind: Validator
  headers:
    Content-Type:
      values:
      - application/json
- name: requestAdaptor
  kind: RequestAdaptor
  header:
    set:
      X-Adapt-Key: goodplan
- name: proxy
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
- name: responseBuilder
  kind: ResponseBuilder
  template: |
    statusCode: 400
    body: the request is invalid.
' | egctl apply -f -
- By using the 
ENDfilter, we are now possible to build a custom failure response. 
Alias
- We have assigned a name to a filter when we define it, but a filter can be used more than once in the flow, in this case, we can assign an alias to each appearance.
 
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: validator
  jumpIf:
    invalid: mockRequestForProxy2
- filter: mockRequest
- filter: proxy1
- filter: END
- filter: mockRequest
  alias: mockRequestForProxy2
- filter: proxy2
filters:
- name: validator
  kind: Validator
  headers:
    Content-Type:
      values:
      - application/json
- name: mockRequest
  kind: RequestBuilder
  template: |
    method: GET
    url: /hello
- name: proxy1
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
- name: proxy2
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.2:9095
' | egctl apply -f -
Namespace
- The pipeline can handle multiple requests/responses, requests/responses are grouped by namespaces, and each namespace can have at most one request and one response.
 - We can set the 
namespaceattribute of filters to let the filter know which request/response it should use. RequestBuilderandResponseBuildercan access all requests and responses, they output the result request/response to the namespace they are assigned.- The default namespace is 
DEFAULT. 
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: 200
    body: [{{.responses.demo1.Body}}, {{.response.demo2.Body}}, {{.response.demo3.Body}}]
' | egctl create -f -
Usage
GlobalFilter
GlobalFilter is a special pipeline that can be executed before or/and after all pipelines in a server. For example:
name: globalFilter-example
kind: GlobalFilter
beforePipeline:
  flow:
  - filter: validator
  filters:
  - name: validator
    kind: Validator
    ...
afterPipeline:
  ...
---
name: server-example
kind: HTTPServer
globalFilter: globalFilter-example
...
In this case, all requests in HTTPServer server-example go through GlobalFilter globalFilter-example before and after executing any other pipelines.
Load Balancer
The reverse proxy is the common middleware that is accessed by clients, forwards them to backend servers. The reverse proxy is a very core role played by Easegress.
The filter Proxy is the filter to fire requests to backend servers.
It contains servers group under load balance, whose policy support roundRobin, random, weightedRandom, ipHash, headerHash. For more information, please check load balance.
name: pipeline-reverse-proxy
kind: Pipeline
flow:
  - filter: proxy
filters:
  - name: proxy
    kind: Proxy
    pools:
    - servers:
      - url: http://127.0.0.1:9095
      - url: http://127.0.0.1:9096
      - url: http://127.0.0.1:9097
      loadBalance:
        policy: roundRobin
Traffic Adaptor: Change Something of Two-Way Traffic
Sometimes backend applications can’t adapt to quick changes of requirements of traffic. Easegress could be an adaptor between new traffic and old applications. There are 2 phases of adaption in reverse proxy: request adaption, response adaption. RequestAdaptor supports the adaption of a method, path, header, and body. ResponseAdaptor supports the adaption of header and body. As you can see, the flow in spec plays a critical role.
name: pipeline-reverse-proxy
kind: Pipeline
flow:
  - filter: requestAdaptor
  - filter: proxy
  - filter: responseAdaptor
filters:
  - name: requestAdaptor
    kind: RequestAdaptor
    host: easegress.megaease.com
    method: POST
    path:
      trimPrefix: /apis/v2
    header:
      set:
        X-Api-Version: v2
  - name: responseAdaptor
    kind: ResponseAdaptor
    header:
      set:
        Server: Easegress v1.0.0
      add:
        X-Easegress-Pipeline: pipeline-reverse-proxy
  - name: proxy
    kind: Proxy
    pools:
    - servers:
      - url: http://127.0.0.1:9095
      - url: http://127.0.0.1:9096
      - url: http://127.0.0.1:9097
      loadBalance:
        policy: roundRobin
API Aggregation
API aggregation is a pattern to aggregate multiple individual requests into a single request. This pattern is useful when a client must make multiple calls to different backend systems to operate. Easegress provides filters RequestBuilder & ResponseBuilder for this powerful feature.
Example 1: Simple aggregation
- Suppose we have three backend services called 
demo1,demo2, anddemo3. We want to call these three services and combine their response to one.demo1returns{"mega":"ease"}as the HTTP response body,demo2returns{"hello":"world"}, anddemo3returns{"hello":"new world"}. 
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: 200
    body: |
      [{{.responses.demo1.Body}}, {{.responses.demo2.Body}}, {{.responses.demo3.Body}}]
' | egctl create -f -
- Creating an HTTPServer for forwarding the traffic to this pipeline.
 
echo '
kind: HTTPServer
name: server-demo
port: 10080
keepAlive: true
https: false
rules:
  - paths:
    - pathPrefix: /api
      backend: pipeline-api ' | egctl create -f -
- Send a request to the pipeline
 
$ curl  -X GET  http://127.0.0.1:10080/api -v
[{"hello":"world"},{"hello":"new world"},{"mega":"ease"}]
Example 2: Merge response body
- In #Example 1, 
demo2anddemo3’s responses share the same JSON key, we want to merge their response body by the JSON key together. If there are duplicated keys, we will use the last value. 
- Update the pipeline spec
 
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: 200
    body: |
      {{mergeObject .responses.demo1.JSONBody .responses.demo2.JSONBody .responses.demo3.JSONBody | toRawJson}}
' | egctl apply -f -
- Send a request to the pipeline
 
$ curl  -X GET  http://127.0.0.1:10080/api -v
{"hello":"new world","mega":"ease"}
Example 3: Handle Failures
In #Example 1, if one of the backend services encounters a failure, the pipeline will result a wrong result. We can improve the pipeline to handle this kind of failures.
- Update the pipeline spec
 
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
  jumpIf: { serverError: proxy-demo2 }
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: {{$code := 503}}{{range $resp := .responses}}{{if eq $resp.StatusCode 200}}{{$code = 200}}{{break}}{{end}}{{end}}{{$code}}
    body: |
      {{$first := true -}}
      [{{range $resp := .responses -}}
        {{if eq $resp.StatusCode 200 -}}
          {{if not $first}},{{end}}{{$resp.Body}}{{$first = false -}}
        {{- end}}
      {{- end}}]
' | egctl apply -f -
- Stop service 
demo1and send a request to the pipeline 
$ curl  -X GET  http://127.0.0.1:10080/api -v
[{"hello":"world"},{"hello":"new world"}]