Shortcuts

Source code for torchgeo.datasets.cyclone

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

"""Tropical Cyclone Wind Estimation Competition dataset."""

import json
import os
from functools import lru_cache
from typing import Any, Callable, Dict, Optional

import numpy as np
import pytorch_lightning as pl
import torch
from PIL import Image
from sklearn.model_selection import GroupShuffleSplit
from torch import Tensor
from torch.utils.data import DataLoader, Subset

from .geo import VisionDataset
from .utils import check_integrity, download_radiant_mlhub_dataset, extract_archive

# https://github.com/pytorch/pytorch/issues/60979
# https://github.com/pytorch/pytorch/pull/61045
DataLoader.__module__ = "torch.utils.data"


class TropicalCycloneWindEstimation(VisionDataset):
    """Tropical Cyclone Wind Estimation Competition dataset.

    A collection of tropical storms in the Atlantic and East Pacific Oceans from 2000 to
    2019 with corresponding maximum sustained surface wind speed. This dataset is split
    into training and test categories for the purpose of a competition.

    See https://www.drivendata.org/competitions/72/predict-wind-speeds/ for more
    information about the competition.

    If you use this dataset in your research, please cite the following paper:

    * http://doi.org/10.1109/JSTARS.2020.3011907

    .. note::

       This dataset requires the following additional library to be installed:

       * `radiant-mlhub <https://pypi.org/project/radiant-mlhub/>`_ to download the
         imagery and labels from the Radiant Earth MLHub
    """

    collection_id = "nasa_tropical_storm_competition"
    md5s = {
        "train": {
            "source": "97e913667a398704ea8d28196d91dad6",
            "labels": "97d02608b74c82ffe7496a9404a30413",
        },
        "test": {
            "source": "8d88099e4b310feb7781d776a6e1dcef",
            "labels": "d910c430f90153c1f78a99cbc08e7bd0",
        },
    }
    size = 366

[docs] def __init__( self, root: str = "data", split: str = "train", transforms: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, download: bool = False, api_key: Optional[str] = None, checksum: bool = False, ) -> None: """Initialize a new Tropical Cyclone Wind Estimation Competition Dataset. Args: root: root directory where dataset can be found split: one of "train" or "test" transforms: a function/transform that takes input sample and its target as entry and returns a transformed version download: if True, download dataset and store it in the root directory api_key: a RadiantEarth MLHub API key to use for downloading the dataset checksum: if True, check the MD5 of the downloaded files (may be slow) Raises: AssertionError: if ``split`` argument is invalid RuntimeError: if ``download=False`` but dataset is missing or checksum fails """ assert split in self.md5s self.root = root self.split = split self.transforms = transforms self.checksum = checksum if download: self._download(api_key) if not self._check_integrity(): raise RuntimeError( "Dataset not found or corrupted. " + "You can use download=True to download it" ) output_dir = "_".join([self.collection_id, split, "source"]) filename = os.path.join(root, output_dir, "collection.json") with open(filename) as f: self.collection = json.load(f)["links"]
[docs] def __getitem__(self, index: int) -> Dict[str, Any]: """Return an index within the dataset. Args: index: index to return Returns: data, labels, field ids, and metadata at that index """ source_id = os.path.split(self.collection[index]["href"])[0] directory = os.path.join( self.root, "_".join([self.collection_id, self.split, "{0}"]), source_id.replace("source", "{0}"), ) sample: Dict[str, Any] = {"image": self._load_image(directory)} sample.update(self._load_features(directory)) if self.transforms is not None: sample = self.transforms(sample) return sample
[docs] def __len__(self) -> int: """Return the number of data points in the dataset. Returns: length of the dataset """ return len(self.collection)
@lru_cache() def _load_image(self, directory: str) -> Tensor: """Load a single image. Args: directory: directory containing image Returns: the image """ filename = os.path.join(directory.format("source"), "image.jpg") with Image.open(filename) as img: if img.height != self.size or img.width != self.size: img = img.resize(size=(self.size, self.size), resample=Image.BILINEAR) array = np.array(img) if len(array.shape) == 3: array = array[:, :, 0] tensor: Tensor = torch.from_numpy(array) # type: ignore[attr-defined] return tensor def _load_features(self, directory: str) -> Dict[str, Any]: """Load features for a single image. Args: directory: directory containing image Returns: the features """ filename = os.path.join(directory.format("source"), "features.json") with open(filename) as f: features: Dict[str, Any] = json.load(f) filename = os.path.join(directory.format("labels"), "labels.json") with open(filename) as f: features.update(json.load(f)) features["relative_time"] = int(features["relative_time"]) features["ocean"] = int(features["ocean"]) features["label"] = int(features["wind_speed"]) return features def _check_integrity(self) -> bool: """Check integrity of dataset. Returns: True if dataset files are found and/or MD5s match, else False """ for split, resources in self.md5s.items(): for resource_type, md5 in resources.items(): filename = "_".join([self.collection_id, split, resource_type]) filename = os.path.join(self.root, filename + ".tar.gz") if not check_integrity(filename, md5 if self.checksum else None): return False return True def _download(self, api_key: Optional[str] = None) -> None: """Download the dataset and extract it. Args: api_key: a RadiantEarth MLHub API key to use for downloading the dataset Raises: RuntimeError: if download doesn't work correctly or checksums don't match """ if self._check_integrity(): print("Files already downloaded and verified") return download_radiant_mlhub_dataset(self.collection_id, self.root, api_key) for split, resources in self.md5s.items(): for resource_type in resources: filename = "_".join([self.collection_id, split, resource_type]) filename = os.path.join(self.root, filename) + ".tar.gz" extract_archive(filename, self.root) class CycloneDataModule(pl.LightningDataModule): """LightningDataModule implementation for the NASA Cyclone dataset. Implements 80/20 train/val splits based on hurricane storm ids. See :func:`setup` for more details. """
[docs] def __init__( self, root_dir: str, seed: int, batch_size: int = 64, num_workers: int = 0, api_key: Optional[str] = None, **kwargs: Any, ) -> None: """Initialize a LightningDataModule for NASA Cyclone based DataLoaders. Args: root_dir: The ``root`` arugment to pass to the TropicalCycloneWindEstimation Datasets classes seed: The seed value to use when doing the sklearn based GroupShuffleSplit batch_size: The batch size to use in all created DataLoaders num_workers: The number of workers to use in all created DataLoaders api_key: The RadiantEarth MLHub API key to use if the dataset needs to be downloaded """ super().__init__() # type: ignore[no-untyped-call] self.root_dir = root_dir self.seed = seed self.batch_size = batch_size self.num_workers = num_workers self.api_key = api_key
[docs] def custom_transform(self, sample: Dict[str, Any]) -> Dict[str, Any]: """Transform a single sample from the Dataset. Args: sample: dictionary containing image and target Returns: preprocessed sample """ sample["image"] = sample["image"] / 255.0 # scale to [0,1] sample["image"] = ( sample["image"].unsqueeze(0).repeat(3, 1, 1) ) # convert to 3 channel sample["label"] = torch.as_tensor( # type: ignore[attr-defined] sample["label"] ).float() return sample
[docs] def prepare_data(self) -> None: """Initialize the main ``Dataset`` objects for use in :func:`setup`. This includes optionally downloading the dataset. This is done once per node, while :func:`setup` is done once per GPU. """ TropicalCycloneWindEstimation( self.root_dir, split="train", transforms=self.custom_transform, download=self.api_key is not None, api_key=self.api_key, )
[docs] def setup(self, stage: Optional[str] = None) -> None: """Create the train/val/test splits based on the original Dataset objects. The splits should be done here vs. in :func:`__init__` per the docs: https://pytorch-lightning.readthedocs.io/en/latest/extensions/datamodules.html#setup. We split samples between train/val by the ``storm_id`` property. I.e. all samples with the same ``storm_id`` value will be either in the train or the val split. This is important to test one type of generalizability -- given a new storm, can we predict its windspeed. The test set, however, contains *some* storms from the training set (specifically, the latter parts of the storms) as well as some novel storms. Args: stage: stage to set up """ self.all_train_dataset = TropicalCycloneWindEstimation( self.root_dir, split="train", transforms=self.custom_transform, download=False, ) self.all_test_dataset = TropicalCycloneWindEstimation( self.root_dir, split="test", transforms=self.custom_transform, download=False, ) storm_ids = [] for item in self.all_train_dataset.collection: storm_id = item["href"].split("/")[0].split("_")[-2] storm_ids.append(storm_id) train_indices, val_indices = next( GroupShuffleSplit(test_size=0.2, n_splits=2, random_state=self.seed).split( storm_ids, groups=storm_ids ) ) self.train_dataset = Subset(self.all_train_dataset, train_indices) self.val_dataset = Subset(self.all_train_dataset, val_indices) self.test_dataset = Subset( self.all_test_dataset, range(len(self.all_test_dataset)) )
[docs] def train_dataloader(self) -> DataLoader[Any]: """Return a DataLoader for training. Returns: training data loader """ return DataLoader( self.train_dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=True, )
[docs] def val_dataloader(self) -> DataLoader[Any]: """Return a DataLoader for validation. Returns: validation data loader """ return DataLoader( self.val_dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False, )
[docs] def test_dataloader(self) -> DataLoader[Any]: """Return a DataLoader for testing. Returns: testing data loader """ return DataLoader( self.test_dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False, )

© Copyright 2021, Microsoft Corporation. Revision c2b56148.

Built with Sphinx using a theme provided by Read the Docs.
Read the Docs v: v0.1.1
Versions
latest
stable
v0.1.1
v0.1.0
Downloads
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources