Source code for torchgeo.datasets.nasa_marine_debris

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

"""NASA Marine Debris dataset."""

import os
from typing import Callable, Optional

import matplotlib.pyplot as plt
import numpy as np
import rasterio
import torch
from matplotlib.figure import Figure
from torch import Tensor
from torchvision.utils import draw_bounding_boxes

from .geo import NonGeoDataset
from .utils import (

[docs]class NASAMarineDebris(NonGeoDataset): """NASA Marine Debris dataset. The `NASA Marine Debris <>`__ dataset is a dataset for detection of floating marine debris in satellite imagery. Dataset features: * 707 patches with 3 m per pixel resolution (256x256 px) * three spectral bands - RGB * 1 object class: marine_debris * images taken by Planet Labs PlanetScope satellites * imagery taken from 2016-2019 from coasts of Greece, Honduras, and Ghana Dataset format: * images are three-channel geotiffs in uint8 format * labels are numpy files (.npy) containing bounding box (xyxy) coordinates * additional: images in jpg format and labels in geojson format If you use this dataset in your research, please cite the following paper: * .. note:: This dataset requires the following additional library to be installed: * `radiant-mlhub <>`_ to download the imagery and labels from the Radiant Earth MLHub .. versionadded:: 0.2 """ collection_ids = ["nasa_marine_debris_source", "nasa_marine_debris_labels"] directories = ["nasa_marine_debris_source", "nasa_marine_debris_labels"] filenames = ["nasa_marine_debris_source.tar.gz", "nasa_marine_debris_labels.tar.gz"] md5s = ["fe8698d1e68b3f24f0b86b04419a797d", "d8084f5a72778349e07ac90ec1e1d990"] class_label = "marine_debris"
[docs] def __init__( self, root: str = "data", transforms: Optional[Callable[[dict[str, Tensor]], dict[str, Tensor]]] = None, download: bool = False, api_key: Optional[str] = None, checksum: bool = False, verbose: bool = False, ) -> None: """Initialize a new NASA Marine Debris Dataset instance. Args: root: root directory where dataset can be found transforms: a function/transform that takes input sample and its target as entry and returns a transformed version download: if True, download dataset and store it in the root directory api_key: a RadiantEarth MLHub API key to use for downloading the dataset checksum: if True, check the MD5 of the downloaded files (may be slow) verbose: if True, print messages when new tiles are loaded Raises: DatasetNotFoundError: If dataset is not found and *download* is False. """ self.root = root self.transforms = transforms = download self.api_key = api_key self.checksum = checksum self.verbose = verbose self._verify() self.files = self._load_files()
[docs] def __getitem__(self, index: int) -> dict[str, Tensor]: """Return an index within the dataset. Args: index: index to return Returns: data and labels at that index """ image = self._load_image(self.files[index]["image"]) boxes = self._load_target(self.files[index]["target"]) sample = {"image": image, "boxes": boxes} # Filter invalid boxes w_check = (sample["boxes"][:, 2] - sample["boxes"][:, 0]) > 0 h_check = (sample["boxes"][:, 3] - sample["boxes"][:, 1]) > 0 indices = w_check & h_check sample["boxes"] = sample["boxes"][indices] if self.transforms is not None: sample = self.transforms(sample) return sample
[docs] def __len__(self) -> int: """Return the number of data points in the dataset. Returns: length of the dataset """ return len(self.files)
def _load_image(self, path: str) -> Tensor: """Load a single image. Args: path: path to the image Returns: the image """ with as f: array = tensor = torch.from_numpy(array).float() return tensor def _load_target(self, path: str) -> Tensor: """Load the target bounding boxes for a single image. Args: path: path to the labels Returns: the target boxes """ array = np.load(path) # boxes contain unecessary value of 1 after xyxy coords array = array[:, :4] tensor = torch.from_numpy(array) return tensor def _load_files(self) -> list[dict[str, str]]: """Load a image and label files. Returns: list of dicts containing image and label files """ image_root = os.path.join(self.root, self.directories[0]) target_root = os.path.join(self.root, self.directories[1]) image_folders = sorted( f for f in os.listdir(image_root) if not f.endswith("json") ) files = [] for folder in image_folders: files.append( { "image": os.path.join(image_root, folder, "image_geotiff.tif"), "target": os.path.join( target_root, folder.replace("source", "labels"), "pixel_bounds.npy", ), } ) return files def _verify(self) -> None: """Verify the integrity of the dataset.""" # Check if the files already exist exists = [ os.path.exists(os.path.join(self.root, directory)) for directory in self.directories ] if all(exists): return # Check if zip file already exists (if so then extract) exists = [] for filename, md5 in zip(self.filenames, self.md5s): filepath = os.path.join(self.root, filename) if os.path.exists(filepath): if self.checksum and not check_integrity(filepath, md5): raise RuntimeError("Dataset checksum mismatch.") exists.append(True) extract_archive(filepath) else: exists.append(False) if all(exists): return # Check if the user requested to download the dataset if not raise DatasetNotFoundError(self) # Download and extract the dataset for collection_id in self.collection_ids: download_radiant_mlhub_collection(collection_id, self.root, self.api_key) for filename, md5 in zip(self.filenames, self.md5s): filepath = os.path.join(self.root, filename) if self.checksum and not check_integrity(filepath, md5): raise RuntimeError("Dataset checksum mismatch.") extract_archive(filepath)
[docs] def plot( self, sample: dict[str, Tensor], show_titles: bool = True, suptitle: Optional[str] = None, ) -> Figure: """Plot a sample from the dataset. Args: sample: a sample returned by :meth:`__getitem__` show_titles: flag indicating whether to show titles above each panel suptitle: optional string to use as a suptitle Returns: a matplotlib Figure with the rendered sample """ ncols = 1 sample["image"] = sample["image"].byte() image = sample["image"] if "boxes" in sample and len(sample["boxes"]): image = draw_bounding_boxes(image=sample["image"], boxes=sample["boxes"]) image = image.permute((1, 2, 0)).numpy() if "prediction_boxes" in sample and len(sample["prediction_boxes"]): ncols += 1 preds = draw_bounding_boxes( image=sample["image"], boxes=sample["prediction_boxes"] ) preds = preds.permute((1, 2, 0)).numpy() fig, axs = plt.subplots(ncols=ncols, figsize=(ncols * 10, 10)) if ncols < 2: axs.imshow(image) axs.axis("off") if show_titles: axs.set_title("Ground Truth") else: axs[0].imshow(image) axs[0].axis("off") axs[1].imshow(preds) axs[1].axis("off") if show_titles: axs[0].set_title("Ground Truth") axs[1].set_title("Predictions") if suptitle is not None: plt.suptitle(suptitle) return fig

© Copyright 2021, Microsoft Corporation. Revision 0e2c76d3.

Built with Sphinx using a theme provided by Read the Docs.
Read the Docs v: latest
On Read the Docs
Project Home

Free document hosting provided by Read the Docs.


Access comprehensive developer documentation for PyTorch

View Docs


Get in-depth tutorials for beginners and advanced developers

View Tutorials


Find development resources and get your questions answered

View Resources