import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier from sklearn.cluster import DBSCAN, KMeans from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, mean_squared_error from sklearn.preprocessing import StandardScaler from sklearn.neighbors import NearestNeighbors import warnings warnings.filterwarnings('ignore') class EarthquakeLocationPredictor: def __init__(self): self.location_model = None self.risk_model = None self.scaler = StandardScaler() self.cluster_model = None self.high_risk_zones = [] def generate_realistic_earthquake_data(self, n_samples=8000): """ Generate data gempa dengan pola geografis yang realistis Mensimulasi zona seismik Indonesia """ np.random.seed(42) # Define seismic zones in Indonesia (simplified) seismic_zones = [ # Sumatera (Ring of Fire) {'center_lat': 0.0, 'center_lon': 101.0, 'radius': 8, 'activity': 0.8}, # Jawa (Java) {'center_lat': -7.0, 'center_lon': 110.0, 'radius': 6, 'activity': 0.7}, # Sulawesi {'center_lat': -2.0, 'center_lon': 120.0, 'radius': 7, 'activity': 0.75}, # Papua {'center_lat': -4.0, 'center_lon': 138.0, 'radius': 5, 'activity': 0.6}, # Maluku {'center_lat': -3.0, 'center_lon': 129.0, 'radius': 4, 'activity': 0.85}, # Nusa Tenggara {'center_lat': -8.5, 'center_lon': 118.0, 'radius': 5, 'activity': 0.65} ] earthquakes = [] for i in range(n_samples): # Choose a seismic zone based on activity level zone_probs = [z['activity'] for z in seismic_zones] zone_probs = np.array(zone_probs) / np.sum(zone_probs) chosen_zone = np.random.choice(len(seismic_zones), p=zone_probs) zone = seismic_zones[chosen_zone] # Generate location within zone angle = np.random.uniform(0, 2*np.pi) radius = np.random.exponential(zone['radius']/3) lat = zone['center_lat'] + radius * np.cos(angle) lon = zone['center_lon'] + radius * np.sin(angle) # Clip to reasonable Indonesia bounds lat = np.clip(lat, -11, 6) lon = np.clip(lon, 95, 141) # Generate other features depth = np.random.exponential(50) # Most earthquakes are shallow magnitude = np.random.gamma(2, 1.5) + 2 # Realistic magnitude distribution # Historical seismic activity in the area (simplified) seismic_density = zone['activity'] + np.random.normal(0, 0.1) # Geological features distance_to_fault = np.random.exponential(20) plate_boundary_distance = np.random.exponential(50) elevation = np.random.normal(200, 500) # Temporal features days_since_last = np.random.exponential(30) recent_activity_count = np.random.poisson(zone['activity'] * 10) # Environmental indicators tectonic_stress = zone['activity'] * 0.5 + np.random.normal(0, 0.2) ground_deformation = np.random.normal(0, 0.1) earthquake = { 'latitude': lat, 'longitude': lon, 'depth_km': depth, 'magnitude': magnitude, 'seismic_density': seismic_density, 'distance_to_fault': distance_to_fault, 'plate_boundary_distance': plate_boundary_distance, 'elevation': elevation, 'days_since_last': days_since_last, 'recent_activity_count': recent_activity_count, 'tectonic_stress': tectonic_stress, 'ground_deformation': ground_deformation, 'zone_id': chosen_zone } earthquakes.append(earthquake) df = pd.DataFrame(earthquakes) # Create target: high risk areas (magnitude > 5.0) df['high_risk'] = (df['magnitude'] > 5.0).astype(int) return df def identify_high_risk_zones(self, df, eps=1.0, min_samples=10): """ Identifikasi zona berisiko tinggi menggunakan clustering """ high_risk_data = df[df['high_risk'] == 1] if len(high_risk_data) == 0: return [] # Clustering berdasarkan lokasi geografis coords = high_risk_data[['latitude', 'longitude']].values # DBSCAN untuk menemukan cluster gempa self.cluster_model = DBSCAN(eps=eps, min_samples=min_samples) clusters = self.cluster_model.fit_predict(coords) high_risk_zones = [] for cluster_id in set(clusters): if cluster_id == -1: # Skip noise points continue cluster_points = coords[clusters == cluster_id] center_lat = np.mean(cluster_points[:, 0]) center_lon = np.mean(cluster_points[:, 1]) # Calculate radius (standard deviation) distances = np.sqrt(np.sum((cluster_points - [center_lat, center_lon])**2, axis=1)) radius = np.std(distances) # Average risk metrics cluster_data = high_risk_data[clusters == cluster_id] avg_magnitude = cluster_data['magnitude'].mean() event_count = len(cluster_data) high_risk_zones.append({ 'center_lat': center_lat, 'center_lon': center_lon, 'radius': max(radius, 0.5), # Minimum radius 'avg_magnitude': avg_magnitude, 'event_count': event_count, 'risk_score': (avg_magnitude - 4.0) * event_count / 10 }) # Sort by risk score high_risk_zones.sort(key=lambda x: x['risk_score'], reverse=True) self.high_risk_zones = high_risk_zones return high_risk_zones def train_location_predictor(self, df): """ Melatih model untuk memprediksi lokasi gempa berikutnya """ # Features untuk prediksi lokasi feature_cols = ['seismic_density', 'distance_to_fault', 'plate_boundary_distance', 'elevation', 'days_since_last', 'recent_activity_count', 'tectonic_stress', 'ground_deformation'] X = df[feature_cols] y_lat = df['latitude'] y_lon = df['longitude'] y_risk = df['high_risk'] # Split data X_train, X_test, y_lat_train, y_lat_test, y_lon_train, y_lon_test, y_risk_train, y_risk_test = train_test_split( X, y_lat, y_lon, y_risk, test_size=0.2, random_state=42 ) # Scale features X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) # Train location prediction models self.lat_model = RandomForestRegressor(n_estimators=100, random_state=42) self.lon_model = RandomForestRegressor(n_estimators=100, random_state=42) self.risk_model = RandomForestClassifier(n_estimators=100, random_state=42) self.lat_model.fit(X_train_scaled, y_lat_train) self.lon_model.fit(X_train_scaled, y_lon_train) self.risk_model.fit(X_train_scaled, y_risk_train) # Predictions lat_pred = self.lat_model.predict(X_test_scaled) lon_pred = self.lon_model.predict(X_test_scaled) risk_pred = self.risk_model.predict(X_test_scaled) risk_proba = self.risk_model.predict_proba(X_test_scaled)[:, 1] # Calculate location prediction error (distance in km) location_errors = [] for i in range(len(lat_pred)): error_km = self.haversine_distance( y_lat_test.iloc[i], y_lon_test.iloc[i], lat_pred[i], lon_pred[i] ) location_errors.append(error_km) self.feature_names = feature_cols return { 'location_mae_km': np.mean(location_errors), 'location_rmse_km': np.sqrt(np.mean(np.array(location_errors)**2)), 'risk_accuracy': accuracy_score(y_risk_test, risk_pred), 'risk_report': classification_report(y_risk_test, risk_pred), 'test_results': { 'y_lat_test': y_lat_test, 'y_lon_test': y_lon_test, 'lat_pred': lat_pred, 'lon_pred': lon_pred, 'y_risk_test': y_risk_test, 'risk_pred': risk_pred, 'risk_proba': risk_proba, 'location_errors': location_errors } } def haversine_distance(self, lat1, lon1, lat2, lon2): """ Menghitung jarak antara dua titik koordinat dalam km """ R = 6371 # Earth's radius in km lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2 c = 2 * np.arcsin(np.sqrt(a)) return R * c def predict_next_earthquake_location(self, seismic_conditions): """ Prediksi lokasi gempa berikutnya berdasarkan kondisi seismik """ if self.lat_model is None or self.lon_model is None: raise ValueError("Model belum dilatih. Jalankan train_location_predictor() dulu.") # Convert input to DataFrame input_df = pd.DataFrame([seismic_conditions]) # Ensure all features present for feature in self.feature_names: if feature not in input_df.columns: input_df[feature] = 0 input_df = input_df[self.feature_names] input_scaled = self.scaler.transform(input_df) # Predict location and risk pred_lat = self.lat_model.predict(input_scaled)[0] pred_lon = self.lon_model.predict(input_scaled)[0] risk_proba = self.risk_model.predict_proba(input_scaled)[0][1] # Find nearest high-risk zone nearest_zone = self.find_nearest_risk_zone(pred_lat, pred_lon) return { 'predicted_latitude': pred_lat, 'predicted_longitude': pred_lon, 'risk_probability': risk_proba, 'risk_level': self.get_risk_level(risk_proba), 'nearest_high_risk_zone': nearest_zone, 'location_name': self.get_location_name(pred_lat, pred_lon) } def find_nearest_risk_zone(self, lat, lon): """ Temukan zona berisiko tinggi terdekat """ if not self.high_risk_zones: return None min_distance = float('inf') nearest_zone = None for zone in self.high_risk_zones: distance = self.haversine_distance( lat, lon, zone['center_lat'], zone['center_lon'] ) if distance < min_distance: min_distance = distance nearest_zone = { 'distance_km': distance, 'center_lat': zone['center_lat'], 'center_lon': zone['center_lon'], 'avg_magnitude': zone['avg_magnitude'], 'risk_score': zone['risk_score'] } return nearest_zone def get_location_name(self, lat, lon): """ Estimasi nama lokasi berdasarkan koordinat (simplified) """ if -6 <= lat <= -5 and 106 <= lon <= 107: return "Jakarta Region" elif -8 <= lat <= -6 and 110 <= lon <= 113: return "Central Java" elif -1 <= lat <= 4 and 98 <= lon <= 105: return "Sumatra" elif -5 <= lat <= 1 and 119 <= lon <= 125: return "Sulawesi" elif -9 <= lat <= -7 and 115 <= lon <= 120: return "Nusa Tenggara" elif -5 <= lat <= -1 and 127 <= lon <= 132: return "Maluku" elif -6 <= lat <= -1 and 134 <= lon <= 141: return "Papua" else: return f"Coordinates ({lat:.2f}, {lon:.2f})" def get_risk_level(self, probability): """Level risiko berdasarkan probabilitas""" if probability < 0.2: return "RENDAH" elif probability < 0.4: return "SEDANG" elif probability < 0.7: return "TINGGI" else: return "SANGAT TINGGI" def plot_earthquake_map(self, df, results=None): """ Visualisasi peta gempa dan prediksi lokasi """ fig, axes = plt.subplots(2, 2, figsize=(20, 16)) # Plot 1: Historical earthquake distribution scatter = axes[0,0].scatter(df['longitude'], df['latitude'], c=df['magnitude'], s=df['magnitude']*5, cmap='Reds', alpha=0.6) axes[0,0].set_title('Distribusi Gempa Historis (berdasarkan magnitudo)') axes[0,0].set_xlabel('Longitude') axes[0,0].set_ylabel('Latitude') plt.colorbar(scatter, ax=axes[0,0], label='Magnitude') # Plot 2: High-risk zones if self.high_risk_zones: for zone in self.high_risk_zones[:10]: # Top 10 zones circle = plt.Circle((zone['center_lon'], zone['center_lat']), zone['radius'], fill=False, color='red', linewidth=2, alpha=0.7) axes[0,1].add_patch(circle) axes[0,1].plot(zone['center_lon'], zone['center_lat'], 'ro', markersize=8) axes[0,1].scatter(df['longitude'], df['latitude'], c=df['high_risk'], cmap='coolwarm', alpha=0.3, s=20) axes[0,1].set_title('Zona Berisiko Tinggi (Lingkaran Merah)') axes[0,1].set_xlabel('Longitude') axes[0,1].set_ylabel('Latitude') # Plot 3: Prediction accuracy (if results available) if results: test_results = results['test_results'] errors = test_results['location_errors'] axes[1,0].hist(errors, bins=30, alpha=0.7, color='skyblue') axes[1,0].set_title(f'Distribusi Error Prediksi Lokasi\nMean Error: {np.mean(errors):.1f} km') axes[1,0].set_xlabel('Error (km)') axes[1,0].set_ylabel('Frequency') # Plot 4: Feature importance (combined for location prediction) if hasattr(self, 'lat_model'): lat_importance = self.lat_model.feature_importances_ lon_importance = self.lon_model.feature_importances_ combined_importance = (lat_importance + lon_importance) / 2 indices = np.argsort(combined_importance)[::-1] axes[1,1].bar(range(len(combined_importance)), combined_importance[indices]) axes[1,1].set_title('Feature Importance untuk Prediksi Lokasi') axes[1,1].set_xticks(range(len(combined_importance))) axes[1,1].set_xticklabels([self.feature_names[i] for i in indices], rotation=45) plt.tight_layout() plt.show() def analyze_regional_risk(self, df): """ Analisis risiko per wilayah """ regions = [] # Define regional boundaries regional_bounds = { 'Jakarta': {'lat_range': (-6.5, -5.5), 'lon_range': (106, 107.5)}, 'Central Java': {'lat_range': (-8, -6), 'lon_range': (109, 112)}, 'Sumatra': {'lat_range': (-1, 4), 'lon_range': (98, 105)}, 'Sulawesi': {'lat_range': (-5, 1), 'lon_range': (119, 125)}, 'Nusa Tenggara': {'lat_range': (-10, -7), 'lon_range': (115, 120)}, 'Maluku': {'lat_range': (-6, -1), 'lon_range': (127, 132)}, 'Papua': {'lat_range': (-6, -1), 'lon_range': (134, 141)} } for region_name, bounds in regional_bounds.items(): mask = ((df['latitude'] >= bounds['lat_range'][0]) & (df['latitude'] <= bounds['lat_range'][1]) & (df['longitude'] >= bounds['lon_range'][0]) & (df['longitude'] <= bounds['lon_range'][1])) region_data = df[mask] if len(region_data) > 0: regions.append({ 'region': region_name, 'earthquake_count': len(region_data), 'high_risk_count': region_data['high_risk'].sum(), 'avg_magnitude': region_data['magnitude'].mean(), 'max_magnitude': region_data['magnitude'].max(), 'risk_percentage': (region_data['high_risk'].sum() / len(region_data)) * 100 }) return pd.DataFrame(regions).sort_values('risk_percentage', ascending=False) # Fungsi utama demonstrasi def main(): print("=== SISTEM PREDIKSI LOKASI GEMPA ===\n") # Initialize predictor predictor = EarthquakeLocationPredictor() # Generate realistic earthquake data print("1. Generating realistic earthquake data...") df = predictor.generate_realistic_earthquake_data(8000) print(f"Generated {len(df)} earthquake records") print(f"High-risk earthquakes: {df['high_risk'].sum()} ({df['high_risk'].mean():.1%})") # Identify high-risk zones print("\n2. Identifying high-risk zones...") high_risk_zones = predictor.identify_high_risk_zones(df) print(f"Found {len(high_risk_zones)} high-risk zones") print("\nTop 5 High-Risk Zones:") for i, zone in enumerate(high_risk_zones[:5]): print(f"{i+1}. Center: ({zone['center_lat']:.2f}, {zone['center_lon']:.2f})") print(f" Avg Magnitude: {zone['avg_magnitude']:.1f}, Events: {zone['event_count']}") print(f" Risk Score: {zone['risk_score']:.2f}") # Train location predictor print("\n3. Training location prediction model...") results = predictor.train_location_predictor(df) print(f"Location prediction MAE: {results['location_mae_km']:.1f} km") print(f"Location prediction RMSE: {results['location_rmse_km']:.1f} km") print(f"Risk prediction accuracy: {results['risk_accuracy']:.3f}") # Regional risk analysis print("\n4. Regional risk analysis:") regional_analysis = predictor.analyze_regional_risk(df) print(regional_analysis.to_string(index=False)) # Visualizations print("\n5. Generating visualizations...") predictor.plot_earthquake_map(df, results) # Example predictions print("\n6. Contoh Prediksi Lokasi Gempa:") # Example 1: High seismic activity area high_activity_conditions = { 'seismic_density': 0.8, 'distance_to_fault': 5.0, 'plate_boundary_distance': 10.0, 'elevation': 100, 'days_since_last': 5, 'recent_activity_count': 15, 'tectonic_stress': 0.75, 'ground_deformation': 0.1 } prediction1 = predictor.predict_next_earthquake_location(high_activity_conditions) print("\nKondisi Aktivitas Seismik Tinggi:") print(f"Prediksi Lokasi: {prediction1['location_name']}") print(f"Koordinat: ({prediction1['predicted_latitude']:.2f}, {prediction1['predicted_longitude']:.2f})") print(f"Probabilitas Risiko Tinggi: {prediction1['risk_probability']:.3f}") print(f"Level Risiko: {prediction1['risk_level']}") if prediction1['nearest_high_risk_zone']: zone = prediction1['nearest_high_risk_zone'] print(f"Zona Risiko Terdekat: {zone['distance_km']:.1f} km (Mag avg: {zone['avg_magnitude']:.1f})") # Example 2: Low seismic activity area low_activity_conditions = { 'seismic_density': 0.2, 'distance_to_fault': 50.0, 'plate_boundary_distance': 100.0, 'elevation': 500, 'days_since_last': 180, 'recent_activity_count': 2, 'tectonic_stress': 0.1, 'ground_deformation': -0.02 } prediction2 = predictor.predict_next_earthquake_location(low_activity_conditions) print("\nKondisi Aktivitas Seismik Rendah:") print(f"Prediksi Lokasi: {prediction2['location_name']}") print(f"Koordinat: ({prediction2['predicted_latitude']:.2f}, {prediction2['predicted_longitude']:.2f})") print(f"Probabilitas Risiko Tinggi: {prediction2['risk_probability']:.3f}") print(f"Level Risiko: {prediction2['risk_level']}") # Interactive prediction print("\n7. Prediksi Lokasi Interaktif:") print("Masukkan kondisi seismik untuk prediksi lokasi:") try: user_conditions = { 'seismic_density': float(input("Kepadatan seismik (0-1): ") or "0.5"), 'distance_to_fault': float(input("Jarak ke patahan (km): ") or "20"), 'plate_boundary_distance': float(input("Jarak ke batas lempeng (km): ") or "50"), 'elevation': float(input("Elevasi (m): ") or "200"), 'days_since_last': int(input("Hari sejak gempa terakhir: ") or "30"), 'recent_activity_count': int(input("Jumlah aktivitas terkini: ") or "5"), 'tectonic_stress': float(input("Tekanan tektonik (0-1): ") or "0.3"), 'ground_deformation': float(input("Deformasi tanah: ") or "0.0") } user_prediction = predictor.predict_next_earthquake_location(user_conditions) print(f"\n=== HASIL PREDIKSI LOKASI ===") print(f"Lokasi Prediksi: {user_prediction['location_name']}") print(f"Koordinat: ({user_prediction['predicted_latitude']:.3f}, {user_prediction['predicted_longitude']:.3f})") print(f"Probabilitas Risiko Tinggi: {user_prediction['risk_probability']:.3f}") print(f"Level Risiko: {user_prediction['risk_level']}") if user_prediction['nearest_high_risk_zone']: zone = user_prediction['nearest_high_risk_zone'] print(f"Zona Risiko Terdekat: {zone['distance_km']:.1f} km dari pusat prediksi") print(f"Magnitudo rata-rata zona: {zone['avg_magnitude']:.1f}") print(f"Skor risiko zona: {zone['risk_score']:.2f}") except KeyboardInterrupt: print("\nProgram dihentikan.") except Exception as e: print(f"Error: {e}") print("\n=== PREDIKSI LOKASI SELESAI ===") if __name__ == "__main__": main()