ํ ์ค ์ ์
๐ Shuffle = ๋ฐ์ดํฐ๊ฐ ๋คํธ์ํฌ๋ฅผ ํ๊ณ ๋ค๋ฅธ Executor๋ก “์ฌ๋ถ๋ฐฐ”๋๋ ๊ณผ์
์ shuffle์ด ์๊ธฐ๋?
Spark๋ ์๋ ๋ฐ์ดํฐ๋ฅผ **์ฌ๋ฌ ์กฐ๊ฐ(partition)**์ผ๋ก ๋๋ ์ ์ฒ๋ฆฌํด.
๊ทธ๋ฐ๋ฐ ์ด๋ค ์ฐ์ฐ์ ๊ฐ์ ํค๋ฅผ ๊ฐ์ง ๋ฐ์ดํฐ๊ฐ ํ ๊ณณ์ ๋ชจ์ฌ์ผ ํด.
๊ทธ๋ ๋ฐ์ํ๋ ๊ฒ shuffle์ผ.
shuffle์ด ์๊ธฐ๋ ๋ํ ์ฐ์ฐ
์๋ ๋์ค๋ฉด ๋ฌด์กฐ๊ฑด ์์ฌ ๐
- groupBy
- join
- orderBy
- distinct
- repartition
์:
df.groupBy("user_id").count()
๐ user_id๊ฐ ๊ฐ์ row๋ค์ ํ Executor๋ก ๋ชจ์์ผ ํจ
๐ ๊ทธ๋์ ๋ฐ์ดํฐ ์ด๋ ๋ฐ์ = shuffle
๊ทธ๋ฆผ ์์ด ๋น์ ๋ก ์ดํดํ๊ธฐ ๐
- ํ์๋ค์ด ๊ต์ค ์ฌ๋ฌ ๊ณณ์ ํฉ์ด์ ธ ์์
- “๊ฐ์ ๋ฐ๋ผ๋ฆฌ ๋ชจ์ฌ!” ๋ผ๊ณ ์ง์ํจ
๐ ํ์๋ค์ด ๊ต์ค์ ์ฎ๊ฒจ ๋ค๋
๐ ์ด ์ด๋์ด ๋ฐ๋ก shuffle
์ shuffle์ด ๋๋ฆฌ๋?
shuffle์ ๋์์ 3๊ฐ์ง๋ฅผ ์ โ
1๏ธโฃ ๋คํธ์ํฌ I/O
2๏ธโฃ ๋์คํฌ I/O (spill)
3๏ธโฃ serialization / deserialization
๐ CPU๋ณด๋ค ํจ์ฌ ๋๋ฆผ
๊ทธ๋์:
- Spark ์์ ๋๋ฆฌ๋ค = ๋๋ถ๋ถ shuffle ๋๋ฌธ
shuffle ์ / ํ ์ฐจ์ด
โ shuffle ์์
Executor A → ๋คํธ์ํฌ → Executor B
Executor C → ๋์คํฌ → Executor D
โ shuffle ์์
๊ฐ Executor๊ฐ ์๊ธฐ ๋ฐ์ดํฐ๋ง ์ฒ๋ฆฌ
shuffle์ด ์๊ฒผ๋์ง ์ด๋ป๊ฒ ์๋?
1๏ธโฃ ์ฝ๋๋ก ๋ฐ๋ก ๋ณด์
groupBy / join / orderBy
2๏ธโฃ Spark UI์์ ํ์ธ
- DAG์ shuffle boundary
- Stage๊ฐ ๋๋์ด ์์
๊ทธ๋์ ์ต์ ํ์ ํต์ฌ์?
๐ shuffle์ ์ค์ด๊ฑฐ๋, ํผํ๋ ๊ฒ
๋ํ ๋ฐฉ๋ฒ:
- ์์ ํ ์ด๋ธ broadcast join
- join ์ ์ filter, select
- ๋ถํ์ํ groupBy ์ ๊ฑฐ
- ์ ์ ํ partition ์ค๊ณ
ํ ๋ฌธ์ฅ ์์ฝ (๋ฉด์ ์ฉ โจ)
“Shuffle์ Spark์์ ์ฐ์ฐ์ ์ํด ๋ฐ์ดํฐ๋ฅผ Executor ๊ฐ์ ์ฌ๋ถ๋ฐฐํ๋ ๊ณผ์ ์ผ๋ก, ๋คํธ์ํฌ์ ๋์คํฌ I/O๊ฐ ๋ฐ์ํด ์ฑ๋ฅ ๋ณ๋ชฉ์ ์ฃผ์ ์์ธ์ด ๋ฉ๋๋ค.”
์ด๊ฒ๋ง ๊ธฐ์ตํด๋ ๋จ
- shuffle = ๋ฐ์ดํฐ ์ด๋
- ๋๋ฆผ = ๋คํธ์ํฌ + ๋์คํฌ
- Spark ์ต์ ํ = shuffle ์ต์ํ
๋ธ๋ก๋์บ์คํธ ์กฐ์ธ(broadcast join)
- *๋ธ๋ก๋์บ์คํธ ์กฐ์ธ(broadcast join)**์ Spark ์ต์ ํ์์ ํจ๊ณผ๊ฐ ์ ์ผ ํฐ ๊ธฐ์ ์ค ํ๋์ผ. ์์ฃผ ์ฝ๊ฒ ์ค๋ช ํ ๊ฒ.
ํ ์ค ์ ์
๐ ์์ ํ ์ด๋ธ์ ๋ชจ๋ Executor ๋ฉ๋ชจ๋ฆฌ์ ๋ณต์ฌํด์,ํฐ ํ ์ด๋ธ์ด “์ด๋ ์์ด” ์กฐ์ธํ๋ ๋ฐฉ์
์ ์ด๊ฒ ๋น ๋ฅด๋?
์ผ๋ฐ ์กฐ์ธ์ ๐
- ํฐ ํ ์ด๋ธ ↔ ํฐ ํ ์ด๋ธ
- ์๋ก ๋ฐ์ดํฐ๋ฅผ ์ด๋(shuffle) ํด์ผ ํจ → ๋๋ฆผ โ
๋ธ๋ก๋์บ์คํธ ์กฐ์ธ์ ๐
- ์์ ํ ์ด๋ธ์ ๋ฏธ๋ฆฌ ์ ํ
- ํฐ ํ ์ด๋ธ์ ์๊ธฐ ์๋ฆฌ์์ lookup๋ง ํจ
- → shuffle ์์ โ
๊ทธ๋ฆผ ์์ด ๋น์ ๋ก ์ดํดํ๊ธฐ ๐
- ํฐ ์ฑ (๋์ฉ๋ ๋ฐ์ดํฐ)์ด ์ฌ๋ฌ ๊ต์ค์ ๋๋์ด ์์
- ์์ ์ฐธ๊ณ ํ(์์ ํ ์ด๋ธ)๋ฅผ ๋ชจ๋ ๊ต์ค์ ํ ๋ถ์ฉ ๋ณต์ฌ
๐ ํ์(ํฐ ํ ์ด๋ธ)์ด ์ด๋ํ ํ์ ์์ด
๐ ์๊ธฐ ๊ต์ค์์ ๋ฐ๋ก ์ฐธ๊ณ ํ ๋ณด๊ณ ์กฐ์ธ
์ธ์ “์๋ค”๊ณ ํ๋?
์ ํํ ๊ธฐ์ค์ ์์ง๋ง ๋ณดํต ๐
- ์ MB ~ ์์ญ MB
- ์์ฒ ~ ์์ญ๋ง row
- ๋ฉ๋ชจ๋ฆฌ์ ์ถฉ๋ถํ ์ฌ๋ผ๊ฐ๋ ํฌ๊ธฐ
ํต์ฌ ๊ธฐ์ค: Executor ๋ฉ๋ชจ๋ฆฌ์ ์ฌ๋ฆด ์ ์๋?
์ฝ๋๋ก ๋ณด๋ฉด ๋ฐ๋ก ์ดํด๋จ
from pyspark.sql.functions import broadcast
df_big.join(
broadcast(df_small),
"user_id"
)
์ด ํ ์ค์ด ์๋ฏธํ๋ ๊ฒ ๐
๐ df_small์ ๋ชจ๋ Executor์ ๋ณต์ฌ
broadcast ์ ์ฐ๋ฉด ์ด๋ป๊ฒ ๋๋?
df_big.join(df_small, "user_id")
๐ Spark๊ฐ ํ๋จํด์:
- ๋ ๋ค ํฌ๋ฉด → shuffle join
- ์์ผ๋ฉด → ์๋ broadcast (์ ๋ ์๋ ์์)
๐ ๊ทธ๋์ ๋ช ์์ ์ผ๋ก broadcast ์ฐ๋ ๊ฒ ์์
์ธ์ ์ฐ๋ฉด ์ ๋๋? โ
- ํ ์ด๋ธ์ด ํผ (์๋ฐฑ MB ์ด์)
- Executor ๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ
- ์์ฃผ ๋ฐ๋๋ ๋ํ ์ฐจ์ ํ ์ด๋ธ
๐ ์ด๋ ์ฐ๋ฉด OOM(๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ)
์ค๋ฌด์์ ์์ฃผ ์ฐ๋ ํจํด
์์
- clickstream (์์ต row)
- product / user / category ํ ์ด๋ธ (์์)
clicks.join(
broadcast(products),
"product_id"
)
๐ ์ ์ ์ค ์ ์
Spark๋ ์๋์ผ๋ก ํด์ฃผ์ง ์๋?
๐ ๊ฐ๋์ ํด์ฃผ์ง๋ง, ๋ฏฟ์ง ์๋๋ค
- ํต๊ณ ๋ถ์ ํ
- threshold ์ด๊ณผ
- ๊ณํ ๋ณ๊ฒฝ
๐ ์ค์ํ join์ ์ง์ broadcast ์ง์
๋ฉด์ ์ฉ ํ ๋ฌธ์ฅ โจ
“Broadcast join์ ์์ ํ ์ด๋ธ์ Executor ๋ฉ๋ชจ๋ฆฌ์ ๋ณต์ฌํด
shuffle ์์ด ํฐ ํ ์ด๋ธ๊ณผ ์กฐ์ธํ๋ ๋ฐฉ์์ผ๋ก,
Spark ์ฑ๋ฅ์ ํฌ๊ฒ ๊ฐ์ ํ ์ ์์ต๋๋ค.”
์ด๊ฒ๋ง ๊ธฐ์ตํ๋ฉด ๋จ
- broadcast join = shuffle ์ ๊ฑฐ
- ์์ ํ ์ด๋ธ์ผ ๋๋ง
- ๋ฉ๋ชจ๋ฆฌ ์ถฉ๋ถํ ๋๋ง
- join ์ฑ๋ฅ ์ฒด๊ฐ ์ต๊ณ
'DataEngineering > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| spark ui ์์ ๋ณ๋ชฉ์ฐพ๋๋ฒ (0) | 2026.01.30 |
|---|---|
| Spark ์ต์ ํ ์ฒดํฌ๋ฆฌ์คํธ (0) | 2026.01.30 |
| df.cache() ๋ฅผ ์ธ์ ์จ์ผํ๋? (0) | 2026.01.30 |
| Dataframe ์ด๋? (0) | 2026.01.30 |
| Lazy Evaluation ์ด๋? (0) | 2026.01.30 |