Data Engineering/Apache Spark

[Spark] 스파크 완벽 가이드 Ch09. 데이터 소스

minjiwoo 2023. 11. 27. 22:31
728x90

Spark 가 지원하는 데이터 소스를 읽고 쓰는 방법에 대한 단원이다. 

데이터 소스 읽기

spark.read.format("csv")\
    .option("mode", "FAILFAST")\ # 읽기 모드 
    .option("inferSchema", "true")\
    .schema(someSchema)\
    .load()
  • 데이터를 읽을 때는 DataFrameReader를 사용하며, 이는 SparkSession의 read 속성으로 접근한다. 
  • 포맷, 스키마, 읽기 모드, 옵션 과 같은 값들을 지정해주어야 한다.
  • 읽기 모드는 스파크가 형식에 맞지 않는 데이터를 만났을 때의 동작방식을 지정하는 옵션이다. 

읽기 모드

  • permissive : 오류 레코드의 모든 필드를 null로 설정하고 모든 오류 레코드를 _corrupt_record라는 문자열 컬럼에 기록 (기본값)
  • dropMalformed: 형식에 맞지 않는 레코드가 포함된 로우를 제거 
  • failFast : 형식에 맞지 않는 record가 나오면 즉시 종료  

데이터 소스 쓰기

데이터 읽기와 유사하며, DataFrameWriter 를 사용한다.  

df.write.format("csv").mode("overwrite").option("sep","\t")\
    .save("/tmp/my-file.tsv")
  • append: 해당 경로에 이미 존재하는 파일 목록에 결과 파일 추가하기 
  • overwrite: 이미 존재하는 모든 데이터를 완전히 덮어쓰기 
  • errorIfExists : 해당 경로에 데이터나 파일이 존재하는 경우 오류를 발생시키면서 쓰기 작업 실패 (기본값)
  • ignore : 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리를 하지 않음 

 

CSV 파일 

CSV (comma-separated values) 로 콤마 (,) 로 구분된 값을 의미한다. 각 줄이 단일 레코드가 되며 레코드의 각 필드를 콤마로 구분하는 일반적인 텍스트 파일 포맷이다. CSV 용 DataFrameReader를 생성해야 한다. 

spark.read.format("csv")

파일 읽기

csvFile = spark.read.format("csv")\
    .option("header", "true")\
    .option("mode", "FAILFAST")\
    .option("inferSchema", "true")\
    .load("/data/2010-summary.csv")

 

JSON 파일 

JSON 은 JavaScript Object Notation 으로, 자바스크립트에서 온 객체 표기법이다. 스파크에서는 줄로 구분된 JSON 을 기본적으로 사용한다. 

파일 읽기

spark.read.format("json").option("mode", "FAILFAST")\
    .option("inferSchema", "true")\
    .load("/data/flight-data/2010-summary.json").show(5)

파일 쓰기 

데이터 소스에 관계 없이 JSON 파일에 저장할 수 있다. 

csvFile.write.format("json").mode("overwrite").save("/tmp/my-json.json")

 

Parquet 파일 

  • parquet 파일은 컬럼 기반의 데이터 저장 방식으로, 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며 컬럼 기반의 압축 기능을 제공한다. 
  • 아파치 스파크와 잘 호환되므로 스파크의 기본 파일 포맷이기도 하다. 
  • 읽기 연산 시에 JSON 이나 CSV보다 훨씬 효율적으로 동작하므로 장기 저장용 데이터는 파케이 포맷으로 저장하는 것이 좋다. 
  • parquet 파일은 다른 데이터 소스에 비해 옵션이 거의 없다 !! 그 이유는 parquet 파일은 데이터를 저장할 떄 자체 스키마를 사용하여 데이터를 저장하기 때문이다. 따라서 정확한 스키마가 필요한 경우에 스키마를 설정할 수 있긴 하지만, 파케이 파일은 스키마가 파일 자체에 내장 되어 있다. 

파일 읽기

spark.read.format("parquet")\
    .load("/data/2010-summary.parquet").show(5)

파일 쓰기 

csvFile.write.format("parquet")\
    .save("/tmp/my-parquet.parquet")

 

ORC 파일 

하둡 워크로드를 위해 설계된 self-describing 이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷이다. 파케이와 큰 차이점이라고 한다면, 파케이는 스파크에 최적화된 반면 ORC는 하이브에 최적화 되어 있다. 

파일 읽기

spark.read.format("orc")\
    .load("/data/2010-summary.orc").show(5)

파일 쓰기 

csvFile.write.format("orc")\
    .mode("overwrite").save("/tmp/my-orc.orc")

SQL 데이터 베이스 

SQL 데이터 베이스에서 데이터를 읽는 방법은 다른 데이터 소스들과 동일하게 포맷과 옵션을 지정한 후 읽기 작업을 수행하면 된다. 

 

쿼리 푸시다운 

스파크는 DataFrame을 만들기 전에 데이터 베이스 자체에서 데이터를 필터링 할 수 있도록 만들 수 있다. 

데이터 베이스 병렬로 읽기 

numPartitions 옵션을 사용해 읽기 및 쓰기용 동시 작업 수를 제한할 수 있는 최대 파티션 수를 설정할 수 있다. 이 설정으로 과도한 쓰기나 읽기를 막을 수 있다. 

슬라이딩 윈도우 기반의 파티셔닝 

조건절을 기반으로 분할 할 수 있는 방법이다. 

SQL 데이터 베이스 쓰기 

 

텍스트 파일 

일반 텍스트 파일을 읽는 작업을 수행할 수 있다. 파일의 각 줄은 DataFrame의 레코드가 된다. 

텍스트 파일 읽기 

spark.read.textFile("/data/2010-summary.csv")\
    .selectExpr("split(value, ',') as rows").show()

텍스트 파일 쓰기 

텍스트 파일을 쓸 때는 문자열 칼럼이 하나만 존재해야 한다. 

csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text.txt")

고급 I/O 개념 

병렬로 데이터 쓰기 

파일이나 데이터 수는 데이터를 쓰는 시점에 DataFrame 이 가진 파티션 수에 따라 달라질 수 있다. 기본적으로 데이터 파티션 당 하나의 파일이 작성된다. 아래의 예시는 폴더 안에 5개의 파일을 생성한다. 

csvFile.repartition(5).write.format("csv").save("/tmp/multiple.csv")

 

데이터 형식에 대한 숨겨저있는 의미나, 특성까지 알아볼 수 있어서 유익한 단원이었다. 

728x90