DATA INGESTION & ETL PIPELINE

Data Source

MongoDB Collection

Data Extraction

Export as DataFrame

Schema Validation

Against schema.yaml

Train/Test Split

80/20 Ratio

ETL Components

Data Schema Validation

Enforces strict schema compliance using schema.yaml, ensuring data quality and consistency

# schema.yaml sample columns: - having_IP_Address: int64 - URL_Length: int64 - Shortining_Service: int64 - having_At_Symbol: int64 ... numerical_columns: - having_IP_Address - URL_Length ...

Data Transformation

  • Handling missing values with KNNImputer
  • Type casting & normalization
  • Replacing -1 values with 0 in target column
  • Serialization of preprocessor for inference

Implementation Highlights

# Data Ingestion from MongoDB def export_collection_as_dataframe(self): database_name = self.data_ingestion_config.database_name collection_name = self.data_ingestion_config.collection_name collection = self.mongo_client[database_name][collection_name] df = pd.DataFrame(list(collection.find())) if "_id" in df.columns.to_list(): df = df.drop(columns=["_id"], axis=1) df.replace({"na": np.nan}, inplace=True) return df
# Train/Test Split Operation def split_data_as_train_test(self, dataframe: pd.DataFrame): train_set, test_set = train_test_split( dataframe, test_size=self.data_ingestion_config.train_test_split_ratio ) train_set.to_csv( self.data_ingestion_config.training_file_path, index=False, header=True ) test_set.to_csv( self.data_ingestion_config.testing_file_path, index=False, header=True )

Pipeline Features

S3 Synchronization

Automatically syncs processed data to AWS S3 for persistence and downstream access

Drift Detection

Uses Kolmogorov-Smirnov test to detect and report data drift between train and test sets

Exception Handling

Custom NetworkSecurityException class for robust error tracking and handling