BigData TRJ Series ตอนที่ 1: สถาปัตยกรรมระบบประมวลผลวิถีอากาศอัตโนมัติ HYSPLIT (Architecture & Overview)
เจาะลึกสถาปัตยกรรมของระบบ BigData TRJ — Pipeline อัตโนมัติที่ใช้ HYSPLIT, GFS Meteorology Data และ PSCF Analysis เพื่อวิเคราะห์แหล่งกำเนิดฝุ่น PM2.5 ในภาคเหนือ
Problem
ภาคเหนือของไทยเผชิญปัญหาฝุ่น PM2.5 รุนแรงทุกปี แต่การวิเคราะห์แหล่งกำเนิดฝุ่นด้วยแบบจำลอง HYSPLIT ยังทำแบบ Manual ไม่ต่อเนื่อง ต้องใช้ทรัพยากรบุคคลสูง และไม่มีระบบติดตามผลแบบ Real-time
Solution
ออกแบบและพัฒนา Pipeline อัตโนมัติแบบ 6-Stage ด้วย Python + Docker ที่ดาวน์โหลดข้อมูล GFS อัตโนมัติ รัน Backward/Forward Trajectories พร้อมคำนวณ PSCF โดยไม่ต้องใช้คนดูแลตลอด 24 ชั่วโมง
Impact
ลดเวลาการประมวลผลจากหลายวันเหลือ 5-10 นาทีต่อรอบ รองรับการรันอัตโนมัติทุก 6 ชั่วโมงตลอด 24 ชม. และเชื่อมต่อกับ Dashboard แบบ Real-time
BigData TRJ Series ตอนที่ 1: สถาปัตยกรรมระบบประมวลผลวิถีอากาศอัตโนมัติ HYSPLIT
ในฐานะ ผู้ช่วยนักวิจัย และผู้รับผิดชอบระบบ BigData TRJ ของมหาวิทยาลัยชั้นนำแห่งหนึ่งในภาคเหนือ หนึ่งในความท้าทายที่ใหญ่ที่สุดของผมคือการออกแบบระบบอัตโนมัติที่สามารถประมวลผลข้อมูลอุตุนิยมวิทยาและวิเคราะห์แหล่งกำเนิดฝุ่น PM2.5 ได้ตลอด 24 ชั่วโมง โดยไม่ต้องพึ่งพาทีมงานคอยมอนิเตอร์ตลอดเวลา
บทความนี้จะพาไปดูภาพรวมของระบบ และเหตุผลที่เราเลือกสถาปัตยกรรมแบบนี้
1. ปัญหา: ทำไมต้องใช้ระบบอัตโนมัติ?
ทุกปี ช่วงเดือนมกราคมถึงเมษายน ภาคเหนือของไทยต้องเผชิญกับวิกฤตฝุ่น PM2.5 ที่มีสาเหตุหลักจาก ไฟป่าและการเผาในที่โล่ง การทำความเข้าใจ “ทิศทางและแหล่งกำเนิดของฝุ่น” มีความสำคัญต่อการวางมาตรการป้องกันและแจ้งเตือนประชาชน
ก่อนหน้านี้ การวิเคราะห์ใช้วิธี:
- ดาวน์โหลดข้อมูล GFS จาก NOAA ด้วยมือ
- รัน HYSPLIT ทีละสถานี
- วิเคราะห์ผลลัพธ์แบบ Manual
ซึ่งกินเวลาหลายวันและมีโอกาสผิดพลาดสูง
หมายเหตุ: โค้ด Pipeline หลัก (1.py - 6.py) ได้รับต่อยอดจากนักวิจัยข้อมูลชาวต่างชาติ (Data Analyst) ซึ่งเป็นผู้เขียน Logic หลักในการคำนวณ Trajectory และ PSCF ส่วนบทบาทของผมคือการออกแบบ Infrastructure ใหม่ทั้งหมด — Containerization ด้วย Docker, การทำ Parallel Processing, ระบบ Logging, Pipeline Reliability และระบบ Monitoring ต่างๆ ที่ทำให้โค้ดดั้งเดิมกลายเป็นระบบ Automation ที่พร้อม Deploy จริง
2. ภาพรวมระบบ (System Overview)
ระบบนี้ถูกออกแบบให้ทำงานเป็น Daemon รันวนรอบทุก 6 ชั่วโมง โดยใช้ Python เป็นภาษาหลัก ทำงานบน Docker Container เพื่อความสะดวกในการ Deploy
flowchart LR
subgraph DS ["Data Sources"]
A[NOAA GFS 0.25°<br/>Global Forecast System]
B[FIRMS API<br/>Fire Hotspots]
end
subgraph PA ["Pipeline Automation"]
C[Stage 1: 1.py<br/>GFS Download]
D[Stage 2: 2.py<br/>Backward TRJ<br/>14 Stations]
E[Stage 3: 3.py<br/>Hotspot Download<br/>Cache Management]
F[Stage 4: 4.py<br/>Forward TRJ<br/>From Hotspots]
G[Stage 5: 5.py<br/>PSCF Province<br/>9 Provinces]
H[Stage 6: 6.py<br/>PSCF Overall<br/>All Stations]
end
subgraph OP ["Output"]
I[TRJ GeoJSON<br/>Backward Trajectories]
J[FWT GeoJSON<br/>Forward Trajectories]
K[PSCF Province<br/>Source Contribution]
L[PSCF Overall<br/>Regional Analysis]
end
A --> C
C --> D
B --> E
E --> F
D --> F
F --> G
G --> H
D --> I
F --> J
G --> K
H --> L
style DS stroke:#333,stroke-width:1px,color:#fff
style PA stroke:#333,stroke-width:1px,color:#fff
style OP stroke:#333,stroke-width:1px,color:#fff
style C fill:#334155,color:#fff
style D fill:#e65100,color:#fff
style E fill:#d97706,color:#fff
style F fill:#e65100,color:#fff
style G fill:#2e7d32,color:#fff
style H fill:#2e7d32,color:#fff
3. 6-Stage Pipeline — อธิบายทีละขั้นตอน
Stage 1: GFS Download (1.py)
ดาวน์โหลดข้อมูลอุตุนิยมวิทยา GFS (Global Forecast System) จาก NOAA ที่ความละเอียด 0.25 องศา ย้อนหลัง 3 วันล่าสุด ข้อมูลเหล่านี้จำเป็นสำหรับการคำนวณ Trajectory ในขั้นตอนถัดไป
- Protocol: HTTP Download จาก NOAA NOMADS Server
- Storage: เก็บใน
data/gfs/พร้อม Cleanup ไฟล์ที่อายุเกิน 30 วัน - Retry Logic: หากดาวน์โหลดไม่สำเร็จจะลองใหม่ พร้อมบันทึก Log
Stage 2: Backward Trajectory (2.py)
รัน HYSPLIT เพื่อคำนวณทิศทางลมย้อนหลัง 24 ชั่วโมง จากสถานีตรวจวัดคุณภาพอากาศ 14 แห่งในภาคเหนือ
flowchart LR
A[GFS Files<br/>Meteorology Data] --> B[ProcessPoolExecutor<br/>Parallel Runs]
C[14 Stations<br/>Lat/Lon Config] --> B
B --> D[HYSPLIT hyts_std<br/>Backward 24h]
D --> E[tdump files<br/>Raw Output]
E --> F[GeoJSON Generator<br/>Convert Format]
F --> G[TRJ GeoJSON<br/>Final Output]
style B fill:#ff9800,color:#fff
style D fill:#f44336,color:#fff
- Parallel Execution: ใช้
ProcessPoolExecutorเพื่อรัน HYSPLIT แบบขนาน — ลดเวลาได้มากกว่า 70% - Incremental Processing: ข้ามสถานีที่สร้าง GeoJSON ไว้แล้วโดยอัตโนมัติ
- Future Safety Buffer: ไม่รัน Trajectory สำหรับเวลาที่เกินกว่าปัจจุบัน +6 ชั่วโมง
Stage 3: Hotspot Download (3.py)
ดึงข้อมูลจุดความร้อน (Fire Hotspots) จากระบบ FIRMS (Fire Information for Resource Management System) เพื่อใช้ในการคำนวณ Forward Trajectory
- Data Format: CSV มีพิกัด Latitude/Longitude, เวลาที่ตรวจพบ, และความเชื่อมั่น (Confidence)
- Cache Management: เก็บข้อมูลในรูปแบบ NDJSON เพื่อป้องกันการดาวน์โหลดซ้ำ
- Auto Deduplication: (ถูกเพิ่มในเวอร์ชันหลัง) ตรวจสอบวันที่ซ้ำก่อนเขียน CSV
Stage 4: Forward Trajectory (4.py)
รัน HYSPLIT แบบไปข้างหน้า (Forward) จากจุดความร้อนที่ได้จาก Stage 3 เหมือนเป็นการ “ทำนาย” ว่าฝุ่นจะเคลื่อนที่ไปทางไหนใน 24 ชั่วโมงข้างหน้า
Stage 5 & 6: PSCF Analysis (5.py, 6.py)
คำนวณ Potential Source Contribution Function (PSCF) เพื่อหาความน่าจะเป็นของแหล่งกำเนิดฝุ่น
| Stage | ไฟล์ | รายละเอียด |
|---|---|---|
| 5 | pscf_province/ | คำนวณ PSCF แยกรายจังหวัด 9 จังหวัดภาคเหนือ |
| 6 | pscf_overall/ | คำนวณ PSCF ในภาพรวมของทุกสถานี |
sequenceDiagram
participant 5py as Stage 5: Province PSCF
participant TRJ as TRJ Data
participant HOT as Hotspot Data
participant 6py as Stage 6: Overall PSCF
participant OUT as Output GeoJSON
5py->>TRJ: Load Backward Trajectories
5py->>HOT: Load Fire Hotspots
5py->>5py: Spatial Join<br/>Grid-based Probability
5py->>OUT: Province PSCF GeoJSON
6py->>TRJ: Load All Trajectories
6py->>HOT: Load All Hotspots
6py->>6py: Aggregate Analysis
6py->>OUT: Overall PSCF GeoJSON
กลไกสำคัญ: หากวันไหนไม่มีจุดความร้อน (Zero Hotspots) ระบบจะสร้างไฟล์ Placeholder GeoJSON ที่มีสถานะ skipped แทนการ Error เพื่อให้ Dashboard ทำงานต่อได้
4. สถาปัตยกรรมทางเทคนิค (Technical Architecture)
Tech Stack
flowchart LR
subgraph INF ["Infrastructure"]
A[Docker<br/>Alpine/Debian]
B[Go Task<br/>Taskfile Orchestration]
end
subgraph RT ["Runtime"]
C[Python 3.12+]
D[HYSPLIT v5.4.2<br/>Linux Binary]
E[Custom Logger<br/>Daily Rotation]
end
subgraph LIB ["Libraries"]
F[Pandas + GeoPandas]
G[NumPy]
H[SMTP Email Alerts]
end
A --> C
A --> D
C --> F
C --> G
C --> H
C --> E
style INF stroke:#333,stroke-width:1px,color:#fff
style RT stroke:#333,stroke-width:1px,color:#fff
style LIB stroke:#333,stroke-width:1px,color:#fff
style A fill:#475569,color:#fff
style D fill:#f44336,color:#fff
- Language: Python 3.12+ (Pandas, GeoPandas, NumPy)
- Model Engine: HYSPLIT (Linux Binary จาก NOAA ARL)
- Container: Docker — เพื่อความ Portability สูงสุด
- Orchestration: Go Task (Taskfile.yml) — จัดการคำสั่ง Build/Run/Clean
- Scheduling: Infinite Loop ใน Python + Docker
restart: always
สถานีตรวจวัด 14 แห่ง
ระบบเชื่อมต่อกับสถานีตรวจวัดคุณภาพอากาศ 14 แห่งครอบคลุมพื้นที่ภาคเหนือ:
| ลำดับ | สถานี | จังหวัด |
|---|---|---|
| 1 | สถานีเชียงราย | เชียงราย |
| 2 | สถานีเชียงใหม่ | เชียงใหม่ |
| 3 | สถานีลำปาง | ลำปาง |
| 4 | สถานีลำพูน | ลำพูน |
| 5 | สถานีแพร่ | แพร่ |
| 6 | สถานีน่าน | น่าน |
| 7 | สถานีแม่ฮ่องสอน | แม่ฮ่องสอน |
| 8 | สถานีตาก | ตาก |
| 9 | สถานีสุโขทัย | สุโขทัย |
| 10 | สถานีพิษณุโลก | พิษณุโลก |
| 11 | สถานีเพชรบูรณ์ | เพชรบูรณ์ |
| 12 | สถานีอุตรดิตถ์ | อุตรดิตถ์ |
| 13 | สถานีพะเยา | พะเยา |
| 14 | สถานีกำแพงเพชร | กำแพงเพชร |
5. โครงสร้างระบบ (System Structure)
project-root/
├── src/ # Source code
│ ├── 1.py ~ 6.py # Pipeline stages
│ ├── entrypoint.py # Orchestrator (Daemon loop)
│ ├── config.py # Centralized config & constants
│ ├── logger.py # Custom logging (daily rotation)
│ └── alert.py # Email alert system
├── data/
│ ├── gfs/ # GFS meteorological files
│ ├── firms/ # Hotspot CSV files
│ ├── assets/ # Shapefiles & grid tables
│ ├── output/ # Final GeoJSON results
│ │ ├── trj/ # Backward trajectories
│ │ ├── fwt/ # Forward trajectories
│ │ └── pscf_*/ # PSCF analysis
│ ├── logs/ # Daily log rotation
│ └── runs/ # HYSPLIT temp working dirs
├── Dockerfile
├── Taskfile.yml
└── .env.prod
6. การทำงานแบบ Daemon และ Health Check
ระบบทำงานใน Daemon Mode — รันตลอดเวลาและทำ Pipeline ซ้ำทุก 6 ชั่วโมง พร้อมระบบตรวจสอบสุขภาพ:
sequenceDiagram
participant Container as Docker Container
participant Pipeline as Pipeline Engine
participant Logger as Log System
participant Alert as Email Alert
Note over Container: Container starts
Container->>Pipeline: Start Daemon Loop
loop Every 6 Hours
Pipeline->>Pipeline: Run Stages 1-6
Pipeline->>Logger: Write pipeline.log
alt Success
Logger->>Logger: Update health status<br/>Last success = now
else Error
Pipeline->>Alert: Send SMTP alert
Alert->>Alert: Email to operator
end
end
Note over Container: Docker restart: always
- Health Check: คำสั่ง
task checkจะตรวจสอบว่ารอบล่าสุดสำเร็จภายใน 12 ชม. ที่ผ่านมาหรือไม่ - Auto Restart: Docker
restart: alwaysทำให้ระบบกลับ up อัตโนมัติหลัง Server Reboot
ใน ตอนที่ 2 เราจะเจาะลึกบทเรียนจาก Pipeline จริง — Bugs ที่เจอ, การปรับ Performance, และเทคนิคการทำให้ Pipeline ทนทานต่อความผิดพลาด ทุกสถานการณ์แบบ Failure-proof!
ติดตามต่อในตอนที่ 2 ครับ!