Published on

Custom Scrapy Pipeline - JsonLinePipeline with Data Feed

Authors
  • avatar
    Name
    Shelton Ma
    Twitter
  1. Create CustomJsonLinesItemExporter

    import os
    import gzip
    import shutil
    import datetime
    from scrapy.exporters import JsonLinesItemExporter
    
    class CustomJsonLinesItemExporter(JsonLinesItemExporter):
        """
        Custom JsonLinesItemExporter with additional functionality:
        - Adds `export_item_count` to control the maximum size of a single file.
        - Implements `finish_exporting` to finalize the export process.
        - Automatically compresses the output file with gzip after writing.
    
        Attributes:
        - cday: Facilitates quick file splitting based on the day.
        - export_item_count: Enables splitting files by the number of lines.
        """
    
        def __init__(self, file, **kwargs):
            super().__init__(file, **kwargs)
            self.cday = datetime.datetime.now().day
            self.export_item_count = 0
    
        def export_item(self, item):
            super().export_item(item)
            self.export_item_count += 1
    
        def finish_exporting(self):
            self.file.close()
            self.compress_file()
    
        def compress_file(self):
            with open(self.file.name, 'rb') as f_in:
                with gzip.open(f'{self.file.name}.gz', 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            os.unlink(self.file.name)
    
  2. Create pipeline

    import datetime
    from pathlib import Path
    import os
    from scrapy.exceptions import NotConfigured
    from scrapy_tools.exporters import JsonLinesItemExporter
    
    class JsonLinePipeline:
        """
        Writes data to a local file in JSON format.
    
        Features:
        - Processes `item` objects, specifically `KafkaItem`, which must include two fields: `kafka_topic` and `content`.
        - File path format for saving:
        - For topics like '{dbname}_to_{tbname}':
            `{dbname}/{tbname}/year=2023/month=04/day=25/{topic}+{timestamp()}.json.gz`
        - Example: `/mercari/mercari_to_app_user/year=2023/month=05/day=25/mercari_to_app_user+1684986466+865256.json.gz`
        - Utilizes `JsonLinesItemExporter` for writing, with each `content` field occupying a single line.
        - Splits files based on `EXPORT_BATCH_ITEM_COUNT` and separates them by date.
    
        Parameters:
        - `EXPORT_BATCH_ITEM_COUNT`: Default is 1,000,000; configurable in `settings.py`.
        - `EXPORT_BASE_PATH`: Default is '.', configurable in `settings.py`.
        """
    
        def __init__(self):
            self.exporter = {}
            self.batch_item_count = None
            self.base_path = '.'
    
        def open_spider(self, spider):
            self.batch_item_count = spider.settings.getint(
                'EXPORT_BATCH_ITEM_COUNT', 1000000)
            if spider.settings.get('EXPORT_BASE_PATH') is None:
                raise NotConfigured("EXPORT_BASE_PATH is not configured. It has been set to the default path: /data/output. Lack of configuration might result in file write failures. Please review and address this issue!")
            self.base_path = spider.settings.get(
                'EXPORT_BASE_PATH', '/data/output')
    
        def close_spider(self, spider):
            for exporter in self.exporter.values():
                exporter.finish_exporting()
    
        def _exporter_for_item(self, item):
            """
            Initializes CustomJsonLinesItemExporter.
    
            If the exporter has already been initialized and data has been written
            (`self.export_item_count >= self.batch_item_count`), a new exporter instance is created.
            """
            kafka_topic = item.get('kafka_topic')
            if kafka_topic in self.exporter:
                if self.exporter[kafka_topic].export_item_count >= self.batch_item_count:
                    exporter = self.exporter[kafka_topic]
                    exporter.finish_exporting()
                    self.exporter.pop(kafka_topic, None)
                elif self.exporter[kafka_topic].cday != datetime.datetime.now().day:
                    exporter = self.exporter[kafka_topic]
                    exporter.finish_exporting()
                    self.exporter.pop(kafka_topic, None)
    
            if kafka_topic not in self.exporter:
                now = datetime.datetime.now()
                db_name = kafka_topic.split('_to_')[0]
                sub_path = f'{db_name}/{kafka_topic}/year={now.year:04d}/month={now.month:02d}/day={now.day:02d}'
                file_name = f'{kafka_topic}+{str(now.timestamp()).replace(".", "+")}.json'
                output_path = os.path.join(self.base_path, sub_path, file_name)
                dirname = Path(output_path).parent
                if dirname and not dirname.exists():
                    dirname.mkdir(parents=True)
                json_file = Path(output_path).open('wb')
                exporter = CustomJsonLinesItemExporter(json_file)
                self.exporter[kafka_topic] = exporter
            return self.exporter[kafka_topic]
    
        def process_item(self, item, spider):
            exporter = self._exporter_for_item(item)
            if isinstance(item['content'], dict) and item.get('content').get('crawler_batch_id') is None and hasattr(spider, "crawler_batch_id"):
                item['content']['crawler_batch_id'] = spider.crawler_batch_id
            exporter.export_item(item.get('content'))
            return item
    
    
  3. Create your item, ensure have two props: kafka_topic, content

    import scrapy
    
    class KafkaItem(scrapy.Item):
        kafka_topic = scrapy.Field()
        content = scrapy.Field()
    
  4. Use JsonLinePipeline in your scrapy project

    // src/xxx_scrapy/settings.py
    ITEM_PIPELINES = {
        'scrapy.pipelines.JsonLinePipeline': 300,
    }
    EXPORT_BASE_PATH = '/rawdata'
    EXPORT_BATCH_ITEM_COUNT = 1000000
    
    // /src/xx_scrapy/spiders/myspider.py
    for content in datas:
        kafka_item = KafkaItem()
        kafka_item['kafka_topic'] = "dbname_to_tablename"
        kafka_item['content'] = content
        yield kafka_item
    
  5. Sync file to s3,

    aws s3 sync {prefix} s3://bucket_name/{prefix.replace("/rawdata/", "")} --exclude "*.json"
    

If you want to automatically sync data to S3, you can visit this linksync rawdata to S3 for more details.