Apache kafka

  • Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
    • 메시지 브로커
      • Publisher로 부터 전달받은 메시지를 Subscriber로 전달해주는 중간 역할
      • 응용 소프트웨어 간에 메시지를 교환할 수 있게 한다.
      • 이 때 메시지가 적재되는 공간을 Message Queue라고 하며 메시지의 그룹을 Topic이라고 한다.
  • 링크드인에서 개발, 2011년 오픈 소스화
    • 2014년 11월 링크드인에서 kafka를 개발하던 엔지니어들이 kafka개발에 집중하기 위해 Confluent라는 회사 창립
  • 실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공
  • Producer(송신자)/Consumer(수신자) 분리
  • 메시지를 여러 Consumer에게 허용
  • 높은 처리량을 위한 메시지 최적화
  • Scale-out 가능(클러스터)
  • Eco-system

Kafka Broker

  • 실행 된 kafka 애플리케이션 서버
  • 3대 이상의 Broker Cluster 구성
  • Zookeeper 연동
    • 역할 : 메타데이터 저장
    • Controller 정보 저장
  • n개 Broker 중 1대는 Controller 기능 수행
    • Controller 역할
      • 각 Broker에게 담당 파티션 할당 수행
      • Broker 정상 동작 모니터링

Ecosystem

  • kafka와 데이터를 주고받기 위해 사용하는 Java Library
    • kafka-clients
  • Producer, Consumer, Admin, Stream 등 kafka 관련 API 제공
  • 다양한 3rd party library 존재 : C/C++, Node.js, Python, .NET

Kafka 서버 기동

- Zookeeper 및 kafka 서버 구동
    - $KAFKA_HOME\bin\windows\zookeeper-server-start.bat $KAFKA_HOME\config\zookeeper.properties
    - $KAFKA_HOME\bin\windows\kafka-server-start.bat $KAFKA_HOME\config\server.properties
- Topic 생성
    - $KAFKA_HOME\bin\window\kafka-topics.bat —-create -—topic quickstart-events —-bootstrap-server loacalhost:9092 \ -—partitions 1
- Topic 목록 확인
    - $KAFKA_HOME\bin\window\kafka-topics.bat -—bootstrap-server localhost:9092 —list
- Topic 정보 확인
    - $KAFKA_HOME\bin\window\kafka-topics.bat -—describe -—topic quickstart-events -—bootstrap-server localhost:9092
- 메시지 생산
    - $KAFKA_HOME\bin\window\kafka-console-producer.bat --broker-list localhost:9092 —-topic quickstart-events
- 메시지 소비
    - $KAFKA_HOME\bin\window\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events \ --from-beginning
  • topic생성 후 목록과 정보를 확인해봄

  • 메시지 확인!

Kafka Connect

  • kafka Connect를 통해 Data를 Import/Export 가능
  • 코드없이 Configuration으로 데이터를 이동
  • Standalone mode, Distribution mode 지원
    • RESTful API 통해 지원
    • Stream 또는 Batch 형태로 데이터 전송 가능
    • 커스텀 Connector를 통한 다양한 Plugin 제공
  • 테스트를 위한 MariaDB 설치완료

'DevOps > MSA' 카테고리의 다른 글

Spring Cloud Gateway - Load Balancer  (0) 2023.05.15
Spring Cloud Gateway - Filter 적용  (0) 2023.05.14
Spring Cloud Gateway  (0) 2023.05.14
Spring Cloud 와 Eureka  (0) 2023.05.14

Spring Cloud Gateway - Load Balancer

  • 현재 2개의 서비스가 있음 8081,8082
  • 클라이언트에서 api gateway를 통과하여 요청을 보내면 유레카에 전달되어 위치정보를 전달받고 해당 정보로 포워딩 시켜준다.
  • gateway yml파일 각 라우트에 uri 서비스 이름으로 변경
server:
    port: 8080

eureka:
    client:
        register-with-eureka: true
        fetch-registry: true
        service-url:
            defaultZone: <http://localhost:8761/eureka>
spring:
    application:
        name: gateway-service
    cloud:
        gateway:
            default-filters:
              - name: GlobalFilter
                args:
                  baseMessage: Spring Cloud Gateway Filter
                  preLogger: true
                  PostLogger: true
            routes:
              - id: first-service
                uri: lb://my-first-service
                predicates:
                    - Path=/first-service/**
                filters:
                  #- AddRequestHeader=first-request, first-request-header2
                  #- AddResponseHeader=first-response, first-response-header2
                  - CustomFilter
              - id: first-service
                uri: lb://my-second-service
                predicates:
                    - Path=/second-service/**
                filters:
                    #- AddRequestHeader=second-request, second-request-header2
                    #- AddResponseHeader=second-response, second-response-header2
                    - CustomFilter
  • userService yml파일
server:
  # 0? ?? ?? ??? ?????? ??! ?? ??? ???? ??? ?? ??? ??.
  port: 0

spring:
  application:
    name: ${NAME}

eureka:
   instance:
      instance-id: ${spring.application.name}:${spring.application.instance_id:${random.value}}
   client:
    register-with-eureka: true
    # Eureka ??? ?? ?????? ??? ????? ??? ???
    # ???? ?? -> fetch-registry = true ?? ? ??? ???? ??
    fetch-registry: true
    service-url:
      defaultZone: http://127.0.0.1:8761/eureka
  • uri: lb://my-first-service 이렇게 이름으로 정보를 가져오게 되면 포트정보나 기타 정보가 변경된다 하더라도 정상적으로 가져올 수 있다.

유저 서비스에서 랜덤 포트로 first 2개 second 2개를 띄워보자

  • 포트 번호를 반환하는 api
private final Environment env;
    @GetMapping("first-service/check")
    public String check(HttpServletRequest request){
        log.info("Server port={}",request.getServerPort());
        return String.format("HI Port %s",env.getProperty("local.server.port"));
    }

유레카 현황

  • 라운드 로빈 방식으로 차례차례 해당 서버에 할당되었음을 확인!

'DevOps > MSA' 카테고리의 다른 글

Apache kafka  (0) 2023.05.24
Spring Cloud Gateway - Filter 적용  (0) 2023.05.14
Spring Cloud Gateway  (0) 2023.05.14
Spring Cloud 와 Eureka  (0) 2023.05.14

API Gateway Filter

클라이언트의 요청에 따라 Gateway에서 어떤 서비스로 갈지 판단한 후 요청을 분기해준다.

https://www.inflearn.com/course/lecture?courseSlug=%EC%8A%A4%ED%94%84%EB%A7%81-%ED%81%B4%EB%9D%BC%EC%9A%B0%EB%93%9C-%EB%A7%88%EC%9D%B4%ED%81%AC%EB%A1%9C%EC%84%9C%EB%B9%84%EC%8A%A4&unitId=68416&tab=community&category=questionDetail

  • Gateway Handler Mapping
    • 어떤 요청이 들어왔는지 요청정보를 받는다.
  • Predicate
    • 사전 조건을 분기해주는 역할
  • Pre filter
    • 사전 필터
  • Post filter
    • 사후 필터

Gateway에 filterConfig를 추가하자

package com.example.gateway.config;

import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FilterConfig {
    @Bean
    public RouteLocator gatewayRoutes(RouteLocatorBuilder builder){
        return builder.routes()
                .route(r -> r.path("/first-service/**")
                        .filters(f -> f.addRequestHeader("first-request","first-request-header")
                                .addResponseHeader("first-response","first-response-header"))
                        .uri("<http://localhost:8081>"))
                .route(r -> r.path("/second-service/**")
                        .filters(f -> f.addRequestHeader("second-request","second-request-header")
                                .addResponseHeader("second-response","second-response-header"))
                        .uri("<http://localhost:8082>"))
                .build();
    }
}
  • first-service로 오는 모든 요청에 first-request : first-request-header 헤더, 모든 응답에 first-response : first-response-header를 적용한다.
  • second도 마찬가지로 설정

UserService에 헤더 정보를 log로 찍어보자

package com.example.userservice;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;

@RestController("/")
@Slf4j
public class UserController {

    @GetMapping("first-service")
    public String first(){
        log.info("first 호출됨");
        return "first-service";
    }
    @GetMapping("first-service/message")
    public String firstMessage(@RequestHeader("first-request") String header){
        log.info(header);
        return "first-service";
    }

    @GetMapping("second-service")
    public String second(){
        log.info("second 호출됨");
        return "second-service";
    }
    @GetMapping("second-service/message")
    public String secondMessage(@RequestHeader("second-request") String header){
        log.info(header);
        return "second-service";
    }
}
  • request 정상적으로 받아오는 것 확인!

 

  • response 또한 확인

 

application.yml 설정정보를 이용해 filter를 적용해보자

server:
    port: 8080

eureka:
    client:
        register-with-eureka: false
        fetch-registry: false
        service-url:
            defaultZone: <http://localhost:8761/eureka>
spring:
    application:
        name: gateway-service
    cloud:
        gateway:
            routes:
              - id: first-service
                uri: <http://localhost:8081/>
                predicates:
                    - Path=/first-service/**
                filters:
                  - AddRequestHeader=first-request, first-request-header2
                  - AddResponseHeader=first-response, first-response-header2
              - id: first-service
                uri: <http://localhost:8082/>
                predicates:
                    - Path=/second-service/**
                filters:
                    - AddRequestHeader=second-request, second-request-header2
                    - AddResponseHeader=second-response, second-response-header2

 

  • 정상작동 확인!!

Spring Cloud Gateway Custom Filter

  • 필터 생성
package com.example.gateway.filter;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
@Slf4j
public class CustomFilter extends AbstractGatewayFilterFactory<CustomFilter.Config> {
    public CustomFilter() {
        super(Config.class);
    }
    @Override
    public GatewayFilter apply(Config config) {
       //Custom Prefilter
        return ((exchange, chain) -> {
           ServerHttpRequest request = exchange.getRequest();
           ServerHttpResponse response = exchange.getResponse();

           log.info("Custom PRE filter : request id -> {}", request.getId());

           //Custom Post Filter
            return chain.filter(exchange).then(Mono.fromRunnable(()->{
                log.info("Custom POST filter : response code -> {}", response.getStatusCode());
            }));
        });
    }

    public static class Config{
        //configuration 정보를 기입
    }
}
  • yml 파일 수정
server:
    port: 8080

eureka:
    client:
        register-with-eureka: false
        fetch-registry: false
        service-url:
            defaultZone: <http://localhost:8761/eureka>
spring:
    application:
        name: gateway-service
    cloud:
        gateway:
            routes:
              - id: first-service
                uri: <http://localhost:8081/>
                predicates:
                    - Path=/first-service/**
                filters:
                  #- AddRequestHeader=first-request, first-request-header2
                  #- AddResponseHeader=first-response, first-response-header2
                  - CustomFilter
              - id: first-service
                uri: <http://localhost:8082/>
                predicates:
                    - Path=/second-service/**
                filters:
                    #- AddRequestHeader=second-request, second-request-header2
                    #- AddResponseHeader=second-response, second-response-header2
                  - CustomFilter
  • userService의 매핑url 추가
@GetMapping("second-service/check")
    public String checkSecond(){
        return "Hi, there, second check";
    }

    @GetMapping("first-service/check")
    public String checkFirst(){
        return "Hi, there, first check";
    }

정상 작동 확인

Spring Cloud Gateway - Global Filter

  • 위에 사용했던 필터와 차이점
    • 어떠한 라우트 정보가 실행된다 하더라도 공통적으로 수행될 수 있는 필터

  • 개별적으로 등록된 CustomFilter
  • Global Filter 생성
package com.example.gateway.filter;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
@Slf4j
public class GlobalFilter extends AbstractGatewayFilterFactory<GlobalFilter.Config> {
    public GlobalFilter() {
        super(Config.class);
    }
    @Override
    public GatewayFilter apply(Config config) {
       //Custom Prefilter
        return ((exchange, chain) -> {
           ServerHttpRequest request = exchange.getRequest();
           ServerHttpResponse response = exchange.getResponse();

           log.info("Global Filter baseMessage : request id -> {}", config.getBaseMessage());

           if(config.isPreLogger()){
               log.info("Global Filter Start: request id -> {}", request.getId());
           }
           //Custom Post Filter
            return chain.filter(exchange).then(Mono.fromRunnable(()->{
                if(config.isPostLogger()){
                    log.info("Global Filter end: response code -> {}", response.getStatusCode());
                }
            }));
        });
    }
    @Data
    public static class Config{
        //configuration 정보를 기입
        private String baseMessage;
        private boolean preLogger;
        private boolean postLogger;
    }
}
  • yml 파일 수정
server:
    port: 8080

eureka:
    client:
        register-with-eureka: false
        fetch-registry: false
        service-url:
            defaultZone: <http://localhost:8761/eureka>
spring:
    application:
        name: gateway-service
    cloud:
        gateway:
            default-filters:
              - name: GlobalFilter
                args:
                  baseMessage: Spring Cloud Gateway Filter
                  preLogger: true
                  PostLogger: true
            routes:
              - id: first-service
                uri: <http://localhost:8081/>
                predicates:
                    - Path=/first-service/**
                filters:
                  #- AddRequestHeader=first-request, first-request-header2
                  #- AddResponseHeader=first-response, first-response-header2
                  - CustomFilter
              - id: first-service
                uri: <http://localhost:8082/>
                predicates:
                    - Path=/second-service/**
                filters:
                    #- AddRequestHeader=second-request, second-request-header2
                    #- AddResponseHeader=second-response, second-response-header2
                    - CustomFilter

  • Global filter가 실행되고 Global filter가 종료되기 전 CustomFilter가 수행, 종료되고 그 이후 Global filter가 종료되는 모습을 확인할 수 있다.

'DevOps > MSA' 카테고리의 다른 글

Apache kafka  (0) 2023.05.24
Spring Cloud Gateway - Load Balancer  (0) 2023.05.15
Spring Cloud Gateway  (0) 2023.05.14
Spring Cloud 와 Eureka  (0) 2023.05.14

API Gateway Service

  • API Gateway는 모든 서버로의 요청을 단일지점을 거쳐서 처리하도록 한다. 이를 통해 공통된 로직처리나 인증 및 인가 처리, 라우팅 처리등을 할 수 있다.
  • 시스템의 내부 구조는 숨기고 외부의 요청에 의해 적절한 형태로 응답할 수 있는 장점이 있다.

SpringCloud에서의 MSA간 통신

RestTemplate

  • 전통적으로 다른 서버와의 통신을 위한 방법

FeignClient

  • 특정한 인터페이스를 만들고 이름을 등록user라는 서비스에서 feign client 등록하고 사용하면 직접적인 서버의 주소라던가 포트번호 없이 마이크로 서비스 이름을 가지고 호출할 수 있게된다.
    • 문제는 로드밸런싱을 하기위해 어디에 구축해서 작업할 것인가?
      • Ribbon(netflix 로드밸런싱 기능)
        • 비동기 처리 리액티브 방식과는 호환이 안됨 → 최근에는 잘 사용하지 않는다.
        • 마이크로서비스의 이름만으로 호출할 수 있다.
        • Gateway의 역할을 client side에서 처리하는 방식
    Spring Cloud Gateway
    • Dependencies
      • DevTools, Eureka Discovery Client, Gateway
    • application.yml
    • userService에 Controller를 구현하고 테스트 해보자
server:
    port: 8080

eureka:
    client:
        register-with-eureka: false
        fetch-registry: false
        service-url:
            defaultZone: http://localhost:8761/eureka
spring:
    application:
        name: gateway-service
    cloud:
        gateway:
            routes:
              - id: first-service
                uri: http://localhost:8081/
                predicates:
                    - Path=/first-service/**
              - id: first-service
                uri: http://localhost:8082/
                predicates:
                    - Path=/second-service/**
package com.example.userservice;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController("/")
@Slf4j
public class UserController {

    @GetMapping("first-service")
    public String first(){
        log.info("first 호출됨");
        return "first-service";
    }

    @GetMapping("second-service")
    public String second(){
        log.info("second 호출됨");
        return "second-service";
    }
}

 

각각 다른 서버에서 호출되어야할 api가 호출되는것을 확인!

 

 

'DevOps > MSA' 카테고리의 다른 글

Apache kafka  (0) 2023.05.24
Spring Cloud Gateway - Load Balancer  (0) 2023.05.15
Spring Cloud Gateway - Filter 적용  (0) 2023.05.14
Spring Cloud 와 Eureka  (0) 2023.05.14

Spring Cloud란?

  • MSA 구현을 위한 도구 모음
  • MSA의 개발, 배포, 운영에 필요한 아키텍쳐를 쉽게 구성할 수 있도록 지원하는 Spring Boot 기반 프레임워크
  • 분산 시스템 상에 필요한 여러 패턴들을 표준 패턴화 시켜 손쉽게 개발할 수 있도록 지원함

어떤 서비스가 사용되어야 할까?

  • Spring Cloud Config Server
    • 환경 설정관리를 위함
    • 다양한 마이크로서비스에서 사용하는 정보를 SpringCloudConfigServer를 통해 외부의 저장소에 저장할 수 있다.
    • 유지보수성이 높아진다.
  • Location transparency
    • 서비스 등록과 위치정보 확인 Naming Server(Eureka)
  • Load Distribution(Load Balancing)
    • Ribbon (Client Side)
    • Spring Cloud Gateway(권장)
  • Easier REST Clients
    • 각각의 서비스가 데이터를 주고 받는 방법(RestTemplate , Feign Client)
  • Visibility and monitoring
    • ELK 등등 (로그 추적 및 모니터링)
  • Fault Tolerance
    • Hystrix
      • 서비스의 지연과 장애에 대해 내성을 갖게 해주는 라이브러리

Spring Cloud Netflix Eureka

  • Service Discovery
    • 외부에서 다른 서비스들이 마이크로 서비스를 검색하기 위해 사용되는 일종의 전화번호책이다.
  • API gateway에다가 요청을 보내고 요청 정보가 서비스 디스커버리에 전달되어 필요한 서비스의 위치를 알려준다.

 

  • Eureka 서버 구동 확인 및 application.yml 

server:
  port: 8761

spring:
  application:
    name: discoveryservice

eureka:
  # 이 설정의 default가 true -> 자기 자신을 전화번호부에 등록하는 것은 의미 없는 행위
  client:
    register-with-eureka: false
    fetch-registry: false
  • Eureka 서버에 등록된 userService 구동 확인 및 application.yml 

server:
  port: 9001

spring:
  application:
    name: user-service

eureka:
  client:
    register-with-eureka: true
    # Eureka 서버로 부터 인스턴스들의 정보를 주기적으로 가져올 것인지
    # 설정하는 속성 -> fetch-registry = true 갱신 된 정보를 받겠다는 설정
    fetch-registry: true
    service-url:
      defaultZone: http://127.0.0.1:8761/eureka

같은 서비스를 하나 더 띄워보자

  • application.yml 파일 내부의 port설정을 동적으로 추가해주는 코드를 넣고
  • edit configurations에서 환경변수를 추가해주어야 한다
    • 그렇지 않으면 포트번호가 겹치게 되어 실행 오류가 남
server:
  port: ${PORT}

spring:
  application:
    name: user-service

eureka:
  client:
    register-with-eureka: true
    # Eureka 서버로 부터 인스턴스들의 정보를 주기적으로 가져올 것인지
    # 설정하는 속성 -> fetch-registry = true 갱신 된 정보를 받겠다는 설정
    fetch-registry: true
    service-url:
      defaultZone: http://127.0.0.1:8761/eureka

 

정상작동 확인

랜덤 포트를 이용해보자

  • port값에 0을 넣고 실행시키면 랜덤으로 포트가 지정이 된다.
  • 두 서비스를 돌리고 유레카 서버를 확인해 보니 하나의 서버만 떠있는 상황!
  • 해당 설정을 추가해주고 돌려주면 해결됨!

server:
  # 0의 의미 랜덤 포트를 사용하겠다는 의미! 포트 충돌을 의식하지 않아도 되는 장점이 있다.
  port: 0

spring:
  application:
    name: user-service

eureka:
  ############해당 설정 ###############
  instance:
    instance-id: ${spring.cloud.client.hostname}:${spring.application.instance_id:${random.value}}
  ############해당 설정 ###############
 client:
    register-with-eureka: true
    # Eureka 서버로 부터 인스턴스들의 정보를 주기적으로 가져올 것인지
    # 설정하는 속성 -> fetch-registry = true 갱신 된 정보를 받겠다는 설정
    fetch-registry: true
    service-url:
      defaultZone: http://127.0.0.1:8761/eureka
  • 이러한 기능들을 사용하여 로드밸런싱 등을 쉽게 할 수 있다.

'DevOps > MSA' 카테고리의 다른 글

Apache kafka  (0) 2023.05.24
Spring Cloud Gateway - Load Balancer  (0) 2023.05.15
Spring Cloud Gateway - Filter 적용  (0) 2023.05.14
Spring Cloud Gateway  (0) 2023.05.14

+ Recent posts