- Published on
Custom Scrapy Pipeline - JsonLinePipeline with Data Feed
- Authors
- Name
- Shelton Ma
Create CustomJsonLinesItemExporter
import os import gzip import shutil import datetime from scrapy.exporters import JsonLinesItemExporter class CustomJsonLinesItemExporter(JsonLinesItemExporter): """ Custom JsonLinesItemExporter with additional functionality: - Adds `export_item_count` to control the maximum size of a single file. - Implements `finish_exporting` to finalize the export process. - Automatically compresses the output file with gzip after writing. Attributes: - cday: Facilitates quick file splitting based on the day. - export_item_count: Enables splitting files by the number of lines. """ def __init__(self, file, **kwargs): super().__init__(file, **kwargs) self.cday = datetime.datetime.now().day self.export_item_count = 0 def export_item(self, item): super().export_item(item) self.export_item_count += 1 def finish_exporting(self): self.file.close() self.compress_file() def compress_file(self): with open(self.file.name, 'rb') as f_in: with gzip.open(f'{self.file.name}.gz', 'wb') as f_out: shutil.copyfileobj(f_in, f_out) os.unlink(self.file.name)
Create pipeline
import datetime from pathlib import Path import os from scrapy.exceptions import NotConfigured from scrapy_tools.exporters import JsonLinesItemExporter class JsonLinePipeline: """ Writes data to a local file in JSON format. Features: - Processes `item` objects, specifically `KafkaItem`, which must include two fields: `kafka_topic` and `content`. - File path format for saving: - For topics like '{dbname}_to_{tbname}': `{dbname}/{tbname}/year=2023/month=04/day=25/{topic}+{timestamp()}.json.gz` - Example: `/mercari/mercari_to_app_user/year=2023/month=05/day=25/mercari_to_app_user+1684986466+865256.json.gz` - Utilizes `JsonLinesItemExporter` for writing, with each `content` field occupying a single line. - Splits files based on `EXPORT_BATCH_ITEM_COUNT` and separates them by date. Parameters: - `EXPORT_BATCH_ITEM_COUNT`: Default is 1,000,000; configurable in `settings.py`. - `EXPORT_BASE_PATH`: Default is '.', configurable in `settings.py`. """ def __init__(self): self.exporter = {} self.batch_item_count = None self.base_path = '.' def open_spider(self, spider): self.batch_item_count = spider.settings.getint( 'EXPORT_BATCH_ITEM_COUNT', 1000000) if spider.settings.get('EXPORT_BASE_PATH') is None: raise NotConfigured("EXPORT_BASE_PATH is not configured. It has been set to the default path: /data/output. Lack of configuration might result in file write failures. Please review and address this issue!") self.base_path = spider.settings.get( 'EXPORT_BASE_PATH', '/data/output') def close_spider(self, spider): for exporter in self.exporter.values(): exporter.finish_exporting() def _exporter_for_item(self, item): """ Initializes CustomJsonLinesItemExporter. If the exporter has already been initialized and data has been written (`self.export_item_count >= self.batch_item_count`), a new exporter instance is created. """ kafka_topic = item.get('kafka_topic') if kafka_topic in self.exporter: if self.exporter[kafka_topic].export_item_count >= self.batch_item_count: exporter = self.exporter[kafka_topic] exporter.finish_exporting() self.exporter.pop(kafka_topic, None) elif self.exporter[kafka_topic].cday != datetime.datetime.now().day: exporter = self.exporter[kafka_topic] exporter.finish_exporting() self.exporter.pop(kafka_topic, None) if kafka_topic not in self.exporter: now = datetime.datetime.now() db_name = kafka_topic.split('_to_')[0] sub_path = f'{db_name}/{kafka_topic}/year={now.year:04d}/month={now.month:02d}/day={now.day:02d}' file_name = f'{kafka_topic}+{str(now.timestamp()).replace(".", "+")}.json' output_path = os.path.join(self.base_path, sub_path, file_name) dirname = Path(output_path).parent if dirname and not dirname.exists(): dirname.mkdir(parents=True) json_file = Path(output_path).open('wb') exporter = CustomJsonLinesItemExporter(json_file) self.exporter[kafka_topic] = exporter return self.exporter[kafka_topic] def process_item(self, item, spider): exporter = self._exporter_for_item(item) if isinstance(item['content'], dict) and item.get('content').get('crawler_batch_id') is None and hasattr(spider, "crawler_batch_id"): item['content']['crawler_batch_id'] = spider.crawler_batch_id exporter.export_item(item.get('content')) return item
Create your item, ensure have two props: kafka_topic, content
import scrapy class KafkaItem(scrapy.Item): kafka_topic = scrapy.Field() content = scrapy.Field()
Use JsonLinePipeline in your scrapy project
// src/xxx_scrapy/settings.py ITEM_PIPELINES = { 'scrapy.pipelines.JsonLinePipeline': 300, } EXPORT_BASE_PATH = '/rawdata' EXPORT_BATCH_ITEM_COUNT = 1000000 // /src/xx_scrapy/spiders/myspider.py for content in datas: kafka_item = KafkaItem() kafka_item['kafka_topic'] = "dbname_to_tablename" kafka_item['content'] = content yield kafka_item
Sync file to s3,
aws s3 sync {prefix} s3://bucket_name/{prefix.replace("/rawdata/", "")} --exclude "*.json"
If you want to automatically sync data to S3, you can visit this linksync rawdata to S3 for more details.