Java 8에서 도입된 Parallel Stream은 데이터를 병렬로 처리하여 성능을 향상시키는 방법을 제공합니다. 내부적으로 Fork/Join Framework를 사용하여 작업을 여러 스레드에 분산시키고, 이러한 스레드들이 CPU 코어를 활용하여 동시에 작업을 처리할 수 있도록합니다. Parallel Stream을 사용하면 대량의 데이터 처리 작업을 더 쁘리게 수행할 수 있지만 모든 상황에서 서능 향상을 보장하는 것은 아닙니다.
Parallel Stream 에서 사용되는 스레드 개수는 대부분의 경우 JVM이 실행되는 CPU 코어 개수와 관련 있습니다. 기본적으로, Parallel Stream은 ForkJoinPool의 기본 인스턴스를 사용하는데, 이는 시스템의 CPU 코어 수에 해당하는 스레드를 가지고 있습니다. 정확히는, 사용가능한 CPU 코어 수에 1을 뺀 값으로 설정됩니다.
예를 들어, 하이퍼스레딩을 지원하는 프로세서의 경우 코어당 2개의 스레드를 제공합니다.
int parallelism = ForkJoinPool.commonPool().getParallelism();
System.out.println("parallelism = " + parallelism); // Intel i9 8코어의 경우 15개의 스레드
사용 시 고려 사항
- 스레드 오버헤드 : 작은 데이터 세트 또는 간단한 연산에 대해서는 병렬 처리 오버헤드가 성능 이득을 상쇄할 수 있습니다.
- 스레드 안전 : 병렬 스트림을 사용할 때는 스레드의 안전한 연산이 중요합니다. 상태를 변경하는 연산이나 스레드에 안전하지 않은 외부 자원을 사용하는 경우 주위가 필요합니다.
- 순서 의존성 : 병렬 스트림은 데이터 처리 순서를 보장하지 않습니다. 연산의 순서나 결과가 순서에 의존하는 경우 적합하지 않을 수 있습니다.
예시
대량의 데이터를 처리하고 각 데이터 항목에 대해 시간이 많이 소요되는 연산을 수행해야 하는 경우에 병렬 스트림을 사용할 수 있습니다.
예를 들어, 데이터 베이스에서 수백만 개의 고객 정보를 가져와서 각 고객에 대한 복잡한 계산을 수행하고 결과를 저장해야 한다고 가정해 봅시다.
public class ParallelStreamExample {
public static void main(String[] args) {
// 대량의 고객 ID 목록 가정하여 생성
List<Long> customerIds = Stream.iterate(1l, i -> i + 1)
.limit(1_000_000)
.collect(Collectors.toList());
// 병렬 스트림을 사용하여 각 고객에 대한 복잡한 계산 수행
List<CustomerData> processData = customerIds.parallelStream()
.map(customerId -> processCustomerData(customerId))
.collect(Collectors.toList());
System.out.println("processData = " + processData);
// 처리도니 데이터를 데이터 베이스에 저장하는 로직 (가정)
saveToDatabase(processData);
}
// 고객 데이터를 처리하는 메서드
private static CustomerData processCustomerData(Long customerId) {
// 복잡한 계산 수행 (시뮬레이션)
// 실제로는 고객 데이터를 조회하고 계산을 수행하는 로직이 포함될 것
return new CustomerData(customerId, "Processed Data" + customerId);
}
// 처리된 데이터를 데이터베이스에 저장하는 메소드 (가정)
private static void saveToDatabase(List<CustomerData> processData) {
// 데이터 베이스 저장 로직
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class CustomerData {
private Long customerId;
private String data;
}
}
ForkJoinPool 을 사용하여 스레드 개수 수동으로 변경하여 사용할 경우
public class ParallelStreamExample {
public static void main(String[] args) throws Exception {
// 대량의 고객 ID 목록 가정하여 생성
List<Long> customerIds = Stream.iterate(1L, i -> i + 1)
.limit(1_000_000)
.collect(Collectors.toList());
// 사용자 정의 ForkJoinPool 생성
ForkJoinPool customPool = new ForkJoinPool(4);
try {
// 사용자 정의 ForkJoinPool을 사용하여 병렬 스트림 처리
List<CustomerData> processData = customPool.submit(
() -> customerIds.parallelStream()
.map(customerId -> processCustomerData(customerId))
.collect(Collectors.toList())
).get(); // get()을 호출하여 작업이 완료될 때까지 기다림
// 처리된 데이터를 데이터 베이스에 저장하는 로직 (가정)
saveToDatabase(processData);
} finally {
customPool.shutdown(); // 풀 사용 완료 후 종료
}
}
// 고객 데이터를 처리하는 메서드
private static CustomerData processCustomerData(Long customerId) {
// 복잡한 계산 수행 (시뮬레이션)
return new CustomerData(customerId, "Processed Data" + customerId);
}
// 처리된 데이터를 데이터베이스에 저장하는 메소드 (가정)
private static void saveToDatabase(List<CustomerData> processData) {
// 데이터 베이스 저장 로직
}
// CustomerData 클래스 정의 (Lombok 어노테이션 사용 대신 직접 구현)
static class CustomerData {
private Long customerId;
private String data;
public CustomerData(Long customerId, String data) {
this.customerId = customerId;
this.data = data;
}
// Getter와 Setter 생략
}
}
'자바' 카테고리의 다른 글
자바에서의 불변 객체 (Immutable Object) (1) | 2024.04.03 |
---|---|
자바에서 동일성, 동등성 비교하기 (1) | 2024.04.03 |
Optional<T> (0) | 2024.02.06 |
Stream API (0) | 2024.02.02 |
Lamda Method Reference (0) | 2024.01.31 |