Java Spring WebClient
I. Giới thiệu
Java Spring trước đó đã cung cấp cho chúng ta cách để gọi các api rest thông qua RestTemplate rất dễ dàng qua vài bước. Nhưng các bản Spring mới về sau thì RestTemplate sẽ không được support nữa, thay vào đó là WebClient.
WebClient cung cấp thêm nhiều tính năng mới như Non blocking giúp cho việc chương trình không phải chờ đợi.
II. Sử dụng
Cách sử dụng trong các project spring:
1. File pom.xml
1.1 webflux (chứa WebClient)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
1.2 reactor-netty (chứa thư viện mở rộng reactor netty)
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
1.3 override lại phiên bản reactor netty
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.10</version>
</dependency>
</dependencies>
</dependencyManagement>
2. WebClient Config
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.netty.http.client.HttpClient;
@Configuration
public class WebClientConfig {
@Value("${rest.connectTimeout}")
private int connectTimeout;
@Value("${rest.requestTimeout}")
private int requestTimeout;
@Value("${rest.responseTimeout}")
private int responseTimeout;
@Value("${rest.readTimeout}")
private int readTimeout;
@Value("${rest.writeTimeout}")
private int writeTimeout;
@Bean
public HttpClient httpClient() {
return HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout))
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(writeTimeout, TimeUnit.MILLISECONDS))
);
}
@Bean
public ClientHttpConnector clientHttpConnector(HttpClient httpClient) {
return new ReactorClientHttpConnector(httpClient);
}
@Bean
public WebClient.Builder webClientBuilder(ClientHttpConnector clientHttpConnector) {
return WebClient.builder()
.clientConnector(clientHttpConnector);
}
}
3. Implement WebClient
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import com.google.gson.Gson;
import com.dto.client.ExceptionDto;
import com.dto.client.ResponseDto;
import com.exception.model.RestOperationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
@Slf4j
public class ClientService {
// Lấy từ bean đã config ở trên
private final WebClient.Builder webClientBuilder;
private final Gson gson;
private final apiUrl = "http://localhost:8080/{id}";
/*
* Chạy đồng bộ (block()) đợi đến khi nào nhận kết quả khi gọi api mới đi tiếp (blocking)
*/
public ResponseDto getResponseDto(Long id) {
ResponseDto response = getResponseDtoAsync(id, null, null).block();
return response;
}
/*
* chạy bất đồng bộ (chương trình tiếp tục thực hiện các dòng code tiếp theo mà không phải chờ cho đến khi nhận response
* khi gọi api)
* @id: tham số truyền vào rest api
* @onRestOperationException: consumer xử lý khi có exception RestOperationException (Exception theo logic nghiệp vụ tự
* handler)
* @onException: consumer xử lý khi có exception chung
*/
public Mono<ResponseDto> getResponseDtoAsync(Long id,
Consumer<RestOperationException> onRestOperationException, Consumer<Exception> onException) {
Map<String, Object> params = new HashMap<>();
params.put("id", id);
Mono<ResponseDto> response = webClientBuilder.baseUrl(apiUrl).build()
.get()
.uri(uriBuilder -> {
URI uri = uriBuilder.build(params);
log.info("With Endpoint {}", uri);
return uri;
})
.retrieve()
.onStatus(
httpStatus -> HttpStatus.BAD_REQUEST.equals(httpStatus), clientResponse ->{
return clientResponse.bodyToMono(ExceptionDto.class)
.flatMap(exceptionDto -> {
log.error("id: {} error {}", id, gson.toJson(exceptionDto));
// Xử lý theo logic nghiệp vụ
return Mono.error(new RestOperationException(Integer.valueOf(exceptionDto.getCode()), gson.toJson(exceptionDto)));
});
}
)
.bodyToMono(new ParameterizedTypeReference<ResponseDto>() {
})
.doOnError(RestOperationException.class, onRestOperationException)
.doOnError(Exception.class, onException);
return response;
}
}
4. Test
import com.dto.client.ResponseDto;
import com.exception.model.RestOperationException;
import com.service.client.ClientService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
@Slf4j
public class WebcientTest {
private final ClientService clientService;
public void test() {
Mono<ResponseDto> response = clientService.getResponseDtoAsync(1L,
restException -> onRestOperationException(restException),
exception -> onException(exception));
response.subscribe(dto -> onFetchDto(dto));
}
private void onFetchDto(ResponseDto dto) {
log.info("dto {}", dto);
}
private void onRestOperationException(RestOperationException e) {
log.error("RestOperationException", e);
}
private void onException(Exception e) {
log.error("Exception", e);
}
}
Lỗi có thể gặp phải
reactor.core.Exceptions$ErrorCallbackNotImplemented:
org.springframework.web.reactive.function.client.WebClientRequestException:
Pending acquire queue has reached its maximum size of 1000;
nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException:
Pending acquire queue has reached its maximum size of 1000
Lỗi này do vượt quá 1000 connection trong queue nên sẽ bắn lỗi
Để khắc phục vấn đề này ta có thể config lại trong WebClientConfig
@Bean
public HttpClient httpClient() {
return HttpClient.create(
ConnectionProvider.builder("myConnectionPool")
.maxConnections(maxConnections)
.pendingAcquireMaxCount(maxQueue)
.build())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout))
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(writeTimeout, TimeUnit.MILLISECONDS))
);
}
Hoặc scale theo load balancing phân tải theo nhiều node
Nhận xét
Đăng nhận xét