16  Guide to Data Annotation

16.1 Industry Landscape

16.1.1 Major Players

Table 16.1: Major Players in the Data Annotation Business
Company Specialization Business Model Scale
Scale AI Multi-modal, autonomous vehicles Managed service $7B+ valuation
Labelbox Platform + services Software + marketplace Series D
Appen Text, speech, image Crowd platform Public (ASX)
Amazon SageMaker Ground Truth AWS-integrated Platform Part of AWS
Hive Computer vision API + human loop Series C
Snorkel AI Weak supervision, programmatic Software platform Series C
SuperAnnotate Computer vision, LiDAR Platform + services Series B
Dataloop MLOps + annotation Integrated platform Series C

Table 16.1 shows major players in this market.

16.1.2 Open Source Tools

Table 16.2: Open Source Tools
Tool Best For Language
Label Studio Multi-modal, flexible Python/React
CVAT Computer vision, video Python/React
Prodigy Text, active learning Python
Doccano Text annotation Python/Vue
VoTT Object detection TypeScript
Labelme Image segmentation Python
BRAT Text, NER Python/JavaScript

Table 16.2 shows current open-source that are available on the market.

Our Choice: Label Studio

Why Label Studio for this guide:

  • Multi-modal: Handles text, image, video, audio

  • Highly customizable: XML-based config

  • Active community: Regular updates

  • Production-ready: Used by major companies

  • ML integration: Pre-annotation, active learning

  • Self-hostable: Full control over data

16.2 Market Economics

16.2.1 Pricing Benchmarks (2024-2025)

Code
import pandas as pd
import plotly.graph_objects as go
import numpy as np

# Pricing data from market research
pricing_data = pd.DataFrame({
    'Task Type': [
        'Text Classification',
        'NER (per doc)',
        'Sentiment Analysis',
        'Image Classification',
        'Bounding Box (per image)',
        'Semantic Segmentation',
        'Video Object Tracking (per min)',
        'Audio Transcription (per min)',
        'Medical Image Annotation',
        '3D Point Cloud (LIDAR)'
    ],
    'Low ($)': [0.01, 0.05, 0.02, 0.02, 0.08, 0.50, 2.00, 0.80, 3.00, 5.00],
    'Median ($)': [0.03, 0.15, 0.05, 0.05, 0.25, 1.50, 5.00, 1.50, 8.00, 15.00],
    'High ($)': [0.08, 0.40, 0.12, 0.15, 0.75, 5.00, 15.00, 3.00, 20.00, 40.00],
    'Complexity': [1, 3, 2, 1, 3, 5, 7, 4, 8, 9]
})

fig = go.Figure()

fig.add_trace(go.Bar(
    name='Low',
    x=pricing_data['Task Type'],
    y=pricing_data['Low ($)'],
    marker_color='lightblue'
))

fig.add_trace(go.Bar(
    name='Median',
    x=pricing_data['Task Type'],
    y=pricing_data['Median ($)'],
    marker_color='steelblue'
))

fig.add_trace(go.Bar(
    name='High',
    x=pricing_data['Task Type'],
    y=pricing_data['High ($)'],
    marker_color='darkblue'
))

fig.update_layout(
    barmode='group',
    title='Market Pricing by Task Type (2024-2025)',
    xaxis_title='Task Type',
    yaxis_title='Price (USD)',
    yaxis_type='log',
    height=500,
    hovermode='x unified'
)

fig.show()
Figure 16.1

16.2.2 Cost Structure

Code
import plotly.graph_objects as go

costs = {
    'Labor (Annotators)': 40,
    'QA/Review': 15,
    'Platform/Infrastructure': 10,
    'Management/PM': 12,
    'Training/Onboarding': 5,
    'Customer Support': 3,
    'Overhead': 5,
    'Profit Margin': 10
}

fig = go.Figure(data=[go.Pie(
    labels=list(costs.keys()),
    values=list(costs.values()),
    hole=.3,
    marker_colors=['#FF6B6B', '#4ECDC4', '#45B7D1', '#FFA07A', 
                   '#98D8C8', '#F7DC6F', '#BB8FCE', '#85C1E2']
)])

fig.update_layout(
    title='Typical Cost Structure for Annotation Service (%)',
    height=500
)

fig.show()
Figure 16.2

17 Media Types & Annotation Tasks

17.1 Text Annotation

17.1.1 Task Taxonomy

Table 17.1: Text annotation task types
Code
import pandas as pd

text_tasks = pd.DataFrame({
    'Task': [
        'Text Classification',
        'Named Entity Recognition',
        'Relation Extraction',
        'Sentiment Analysis',
        'Intent Detection',
        'Slot Filling',
        'Coreference Resolution',
        'Semantic Role Labeling',
        'Question Answering',
        'Summarization Quality'
    ],
    'Level': [
        'Document',
        'Token/Span',
        'Span Pair',
        'Document/Aspect',
        'Utterance',
        'Token',
        'Multi-span',
        'Sentence',
        'Span',
        'Document'
    ],
    'Complexity': [1, 3, 4, 2, 2, 3, 5, 5, 3, 4],
    'Avg Time (sec)': [5, 45, 60, 15, 10, 30, 90, 120, 40, 60],
    'Typical Agreement (κ)': [0.85, 0.75, 0.65, 0.70, 0.80, 0.75, 0.60, 0.55, 0.70, 0.65]
})

text_tasks

17.1.2 Named Entity Recognition (NER)

Annotation Interface Requirements:

class NERAnnotation:
    """
    Data structure for NER annotation
    """
    def __init__(self, text: str):
        self.text = text
        self.entities = []
        
    def add_entity(self, start: int, end: int, label: str, text: str):
        """
        Add entity span
        
        Args:
            start: Character offset start
            end: Character offset end
            label: Entity type (PERSON, ORG, LOC, etc.)
            text: The actual text span
        """
        entity = {
            'start': start,
            'end': end,
            'label': label,
            'text': text
        }
        
        # Validate no overlap with existing entities
        if not self._check_overlap(start, end):
            self.entities.append(entity)
            return True
        return False
    
    def _check_overlap(self, start: int, end: int) -> bool:
        """Check if span overlaps with existing entities"""
        for entity in self.entities:
            if not (end <= entity['start'] or start >= entity['end']):
                return True
        return False
    
    def to_dict(self):
        return {
            'text': self.text,
            'entities': self.entities
        }

# Example usage
example_text = "Apple Inc. CEO Tim Cook announced new products in Cupertino."
annotation = NERAnnotation(example_text)

annotation.add_entity(0, 10, 'ORG', 'Apple Inc.')
annotation.add_entity(16, 24, 'PERSON', 'Tim Cook')
annotation.add_entity(54, 63, 'LOC', 'Cupertino')

print(annotation.to_dict())

Common Edge Cases:

Table 17.2: NER edge cases and resolution strategies
Code
import pandas as pd

edge_cases = pd.DataFrame({
    'Scenario': [
        'Nested entities',
        'Discontinuous mentions',
        'Ambiguous boundaries',
        'Coordinated entities',
        'Metonymy',
        'Generic vs specific'
    ],
    'Example': [
        '[Bank of [America]_ORG]_ORG',
        'New York and Los Angeles (two separate LOC)',
        'U.S. vs U.S vs US',
        'Google and Facebook',
        'Wall Street (location vs financial industry)',
        'apple (fruit vs Apple company)'
    ],
    'Resolution Strategy': [
        'Annotate longest span only',
        'Mark each entity separately',
        'Normalize to consistent form',
        'Mark each entity individually',
        'Use context to decide',
        'Require capitalization for ORG'
    ],
    'Guideline Priority': [
        'High',
        'Medium',
        'High',
        'Medium',
        'High',
        'High'
    ]
})

edge_cases

Synthetic Training Data Generation:

import random
from typing import List, Tuple

class SyntheticNERGenerator:
    """
    Generate synthetic NER training data for testing annotation workflows
    """
    
    def __init__(self):
        self.templates = [
            "{PERSON} works at {ORG} in {LOC}.",
            "{ORG} announced that {PERSON} will lead the {ORG} division.",
            "The {ORG} headquarters in {LOC} employs {PERSON}.",
            "{PERSON} traveled from {LOC} to {LOC} for {ORG} business.",
        ]
        
        self.entities = {
            'PERSON': ['Alice Johnson', 'Bob Smith', 'Carol Martinez', 
                      'David Lee', 'Emma Wilson', 'Frank Chen'],
            'ORG': ['TechCorp', 'DataSystems Inc.', 'Global Analytics', 
                   'Innovation Labs', 'Future Solutions'],
            'LOC': ['New York', 'San Francisco', 'London', 
                   'Singapore', 'Berlin', 'Tokyo']
        }
    
    def generate(self, n: int = 10) -> List[dict]:
        """Generate n synthetic examples"""
        examples = []
        
        for _ in range(n):
            template = random.choice(self.templates)
            entities_used = {}
            
            # Replace placeholders
            text = template
            for entity_type in ['PERSON', 'ORG', 'LOC']:
                count = text.count(f'{{{entity_type}}}')
                entities_used[entity_type] = random.sample(
                    self.entities[entity_type], 
                    count
                )
            
            # Build annotated version
            annotations = []
            for entity_type in ['PERSON', 'ORG', 'LOC']:
                for entity_text in entities_used[entity_type]:
                    placeholder = f'{{{entity_type}}}'
                    start = text.find(placeholder)
                    if start != -1:
                        text = text.replace(placeholder, entity_text, 1)
                        annotations.append({
                            'start': start,
                            'end': start + len(entity_text),
                            'label': entity_type,
                            'text': entity_text
                        })
            
            examples.append({
                'text': text,
                'entities': sorted(annotations, key=lambda x: x['start'])
            })
        
        return examples

# Generate examples
generator = SyntheticNERGenerator()
synthetic_examples = generator.generate(5)

for i, example in enumerate(synthetic_examples, 1):
    print(f"\nExample {i}:")
    print(f"Text: {example['text']}")
    print(f"Entities: {example['entities']}")

17.2 Image Annotation

17.2.1 Task Taxonomy

Code
import plotly.graph_objects as go
import numpy as np

tasks = [
    'Image Classification',
    'Bounding Box',
    'Polygon Segmentation', 
    'Semantic Segmentation',
    'Instance Segmentation',
    'Keypoint Annotation',
    'Panoptic Segmentation',
    '3D Cuboid'
]

# Complexity (1-10), Time (seconds), Precision Required (1-10)
complexity = np.array([1, 3, 5, 6, 7, 4, 8, 6])
time_required = np.array([5, 20, 120, 300, 400, 60, 450, 90])
precision = np.array([3, 6, 8, 9, 9, 7, 10, 8])

fig = go.Figure()

fig.add_trace(go.Scatter(
    x=complexity,
    y=time_required,
    mode='markers+text',
    marker=dict(
        size=precision * 5,
        color=precision,
        colorscale='Viridis',
        showscale=True,
        colorbar=dict(title="Precision<br>Required")
    ),
    text=tasks,
    textposition='top center',
    hovertemplate='<b>%{text}</b><br>' +
                  'Complexity: %{x}<br>' +
                  'Time: %{y} sec<br>' +
                  '<extra></extra>'
))

fig.update_layout(
    title='Image Annotation Task Space',
    xaxis_title='Task Complexity (1-10)',
    yaxis_title='Average Time Required (seconds)',
    height=600,
    width=900
)

fig.show()
Figure 17.1

17.2.2 Bounding Box Annotation

Data Structure:

from dataclasses import dataclass
from typing import List, Tuple
import json

@dataclass
class BoundingBox:
    """
    Bounding box representation
    
    Formats:
    - XYXY: (x_min, y_min, x_max, y_max)
    - XYWH: (x, y, width, height)
    - COCO: [x, y, width, height] (top-left corner)
    - YOLO: (x_center, y_center, width, height) normalized
    """
    x: float
    y: float
    width: float
    height: float
    label: str
    confidence: float = 1.0
    image_width: int = None
    image_height: int = None
    
    def to_xyxy(self) -> Tuple[float, float, float, float]:
        """Convert to (x_min, y_min, x_max, y_max)"""
        return (self.x, self.y, self.x + self.width, self.y + self.height)
    
    def to_xywh(self) -> Tuple[float, float, float, float]:
        """Convert to (x, y, width, height)"""
        return (self.x, self.y, self.width, self.height)
    
    def to_yolo(self) -> Tuple[float, float, float, float]:
        """Convert to YOLO format (normalized center coordinates)"""
        if self.image_width is None or self.image_height is None:
            raise ValueError("Image dimensions required for YOLO format")
        
        x_center = (self.x + self.width / 2) / self.image_width
        y_center = (self.y + self.height / 2) / self.image_height
        width_norm = self.width / self.image_width
        height_norm = self.height / self.image_height
        
        return (x_center, y_center, width_norm, height_norm)
    
    def to_coco(self) -> dict:
        """Convert to COCO format"""
        return {
            'bbox': [self.x, self.y, self.width, self.height],
            'category': self.label,
            'score': self.confidence
        }
    
    def area(self) -> float:
        """Calculate box area"""
        return self.width * self.height
    
    def iou(self, other: 'BoundingBox') -> float:
        """
        Calculate Intersection over Union with another box
        
        Used for:
        - Agreement between annotators
        - Matching predicted vs ground truth
        - Non-maximum suppression
        """
        x1_min, y1_min, x1_max, y1_max = self.to_xyxy()
        x2_min, y2_min, x2_max, y2_max = other.to_xyxy()
        
        # Calculate intersection
        x_left = max(x1_min, x2_min)
        y_top = max(y1_min, y2_min)
        x_right = min(x1_max, x2_max)
        y_bottom = min(y1_max, y2_max)
        
        if x_right < x_left or y_bottom < y_top:
            return 0.0
        
        intersection_area = (x_right - x_left) * (y_bottom - y_top)
        
        # Calculate union
        box1_area = self.area()
        box2_area = other.area()
        union_area = box1_area + box2_area - intersection_area
        
        return intersection_area / union_area if union_area > 0 else 0.0

# Example usage
box1 = BoundingBox(x=100, y=100, width=200, height=150, label='person',
                   image_width=1920, image_height=1080)
box2 = BoundingBox(x=120, y=110, width=180, height=140, label='person',
                   image_width=1920, image_height=1080)

print(f"Box 1 (XYXY): {box1.to_xyxy()}")
print(f"Box 1 (YOLO): {box1.to_yolo()}")
print(f"IoU between boxes: {box1.iou(box2):.3f}")

Quality Metrics for Bounding Boxes:

import numpy as np
from typing import List
from scipy.optimize import linear_sum_assignment

class BoundingBoxQualityMetrics:
    """
    Calculate quality metrics for bounding box annotations
    """
    
    @staticmethod
    def calculate_agreement(
        annotator1_boxes: List[BoundingBox],
        annotator2_boxes: List[BoundingBox],
        iou_threshold: float = 0.5
    ) -> dict:
        """
        Calculate agreement between two annotators
        
        Returns:
            dict with precision, recall, F1, mean_iou
        """
        if len(annotator1_boxes) == 0 and len(annotator2_boxes) == 0:
            return {'precision': 1.0, 'recall': 1.0, 'f1': 1.0, 'mean_iou': 1.0}
        
        if len(annotator1_boxes) == 0 or len(annotator2_boxes) == 0:
            return {'precision': 0.0, 'recall': 0.0, 'f1': 0.0, 'mean_iou': 0.0}
        
        # Build IoU matrix
        iou_matrix = np.zeros((len(annotator1_boxes), len(annotator2_boxes)))
        for i, box1 in enumerate(annotator1_boxes):
            for j, box2 in enumerate(annotator2_boxes):
                if box1.label == box2.label:  # Only match same class
                    iou_matrix[i, j] = box1.iou(box2)
        
        # Hungarian algorithm for optimal matching
        row_ind, col_ind = linear_sum_assignment(-iou_matrix)
        
        # Count matches above threshold
        matches = 0
        matched_ious = []
        for i, j in zip(row_ind, col_ind):
            if iou_matrix[i, j] >= iou_threshold:
                matches += 1
                matched_ious.append(iou_matrix[i, j])
        
        # Calculate metrics
        precision = matches / len(annotator1_boxes) if len(annotator1_boxes) > 0 else 0
        recall = matches / len(annotator2_boxes) if len(annotator2_boxes) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        mean_iou = np.mean(matched_ious) if matched_ious else 0.0
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'mean_iou': mean_iou,
            'matches': matches,
            'total_boxes_ann1': len(annotator1_boxes),
            'total_boxes_ann2': len(annotator2_boxes)
        }
    
    @staticmethod
    def aggregate_boxes(
        boxes_list: List[List[BoundingBox]],
        iou_threshold: float = 0.5,
        min_consensus: int = 2
    ) -> List[BoundingBox]:
        """
        Aggregate bounding boxes from multiple annotators
        
        Strategy: Cluster boxes by IoU, keep clusters with min_consensus
        """
        if not boxes_list:
            return []
        
        # Flatten all boxes with annotator ID
        all_boxes = []
        for annotator_id, boxes in enumerate(boxes_list):
            for box in boxes:
                all_boxes.append((annotator_id, box))
        
        if not all_boxes:
            return []
        
        # Cluster boxes
        clusters = []
        used = set()
        
        for i, (ann_id1, box1) in enumerate(all_boxes):
            if i in used:
                continue
            
            cluster = [(ann_id1, box1)]
            used.add(i)
            
            for j, (ann_id2, box2) in enumerate(all_boxes):
                if j in used or ann_id1 == ann_id2:
                    continue
                
                if box1.label == box2.label and box1.iou(box2) >= iou_threshold:
                    cluster.append((ann_id2, box2))
                    used.add(j)
            
            if len(cluster) >= min_consensus:
                clusters.append(cluster)
        
        # Average boxes in each cluster
        aggregated = []
        for cluster in clusters:
            avg_x = np.mean([box.x for _, box in cluster])
            avg_y = np.mean([box.y for _, box in cluster])
            avg_w = np.mean([box.width for _, box in cluster])
            avg_h = np.mean([box.height for _, box in cluster])
            label = cluster[0][1].label
            confidence = len(cluster) / len(boxes_list)
            
            aggregated.append(BoundingBox(
                x=avg_x, y=avg_y, width=avg_w, height=avg_h,
                label=label, confidence=confidence
            ))
        
        return aggregated

# Example: Calculate agreement
ann1_boxes = [
    BoundingBox(100, 100, 200, 150, 'car'),
    BoundingBox(400, 200, 150, 100, 'person')
]

ann2_boxes = [
    BoundingBox(105, 105, 195, 145, 'car'),
    BoundingBox(420, 210, 140, 95, 'person'),
    BoundingBox(700, 300, 80, 120, 'bicycle')
]

metrics = BoundingBoxQualityMetrics.calculate_agreement(ann1_boxes, ann2_boxes)
print("Agreement Metrics:")
for key, value in metrics.items():
    print(f"  {key}: {value:.3f}" if isinstance(value, float) else f"  {key}: {value}")

17.2.3 Segmentation Masks

Polygon Representation:

import numpy as np
from shapely.geometry import Polygon
from shapely.validation import make_valid
import cv2

class SegmentationMask:
    """
    Represent segmentation masks in multiple formats
    """
    
    def __init__(self, polygon_points: List[Tuple[int, int]] = None, 
                 mask_array: np.ndarray = None, label: str = None):
        """
        Initialize from either polygon points or binary mask
        
        Args:
            polygon_points: List of (x, y) tuples
            mask_array: Binary numpy array (H, W)
            label: Class label
        """
        self.label = label
        
        if polygon_points is not None:
            self.polygon = Polygon(polygon_points)
            if not self.polygon.is_valid:
                self.polygon = make_valid(self.polygon)
            self._mask = None
            self._rle = None
        elif mask_array is not None:
            self._mask = mask_array.astype(np.uint8)
            self.polygon = None
            self._rle = None
        else:
            raise ValueError("Must provide either polygon_points or mask_array")
    
    def to_polygon(self) -> List[Tuple[int, int]]:
        """Get polygon coordinates"""
        if self.polygon:
            return list(self.polygon.exterior.coords)
        else:
            # Extract contours from mask
            contours, _ = cv2.findContours(
                self._mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
            )
            if contours:
                # Return largest contour
                largest = max(contours, key=cv2.contourArea)
                return [(int(pt[0][0]), int(pt[0][1])) for pt in largest]
            return []
    
    def to_mask(self, height: int, width: int) -> np.ndarray:
        """Convert to binary mask"""
        if self._mask is not None:
            return self._mask
        
        # Rasterize polygon
        mask = np.zeros((height, width), dtype=np.uint8)
        points = np.array(self.to_polygon(), dtype=np.int32)
        cv2.fillPoly(mask, [points], 1)
        self._mask = mask
        return mask
    
    def to_rle(self, height: int, width: int) -> dict:
        """
        Convert to Run-Length Encoding (COCO format)
        
        More compact storage for masks
        """
        mask = self.to_mask(height, width)
        
        # Flatten mask in Fortran order (column-major)
        pixels = mask.T.flatten()
        
        # Find run lengths
        pixels = np.concatenate([[0], pixels, [0]])
        runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
        runs[1::2] -= runs[::2]
        
        return {
            'counts': runs.tolist(),
            'size': [height, width]
        }
    
    def area(self) -> float:
        """Calculate mask area"""
        if self.polygon:
            return self.polygon.area
        elif self._mask is not None:
            return np.sum(self._mask)
        return 0
    
    def iou(self, other: 'SegmentationMask', height: int, width: int) -> float:
        """Calculate IoU with another mask"""
        mask1 = self.to_mask(height, width)
        mask2 = other.to_mask(height, width)
        
        intersection = np.logical_and(mask1, mask2).sum()
        union = np.logical_or(mask1, mask2).sum()
        
        return intersection / union if union > 0 else 0.0
    
    def dice_coefficient(self, other: 'SegmentationMask', 
                        height: int, width: int) -> float:
        """
        Calculate Dice coefficient (F1 for segmentation)
        
        Dice = 2 * |A ∩ B| / (|A| + |B|)
        """
        mask1 = self.to_mask(height, width)
        mask2 = other.to_mask(height, width)
        
        intersection = np.logical_and(mask1, mask2).sum()
        sum_areas = mask1.sum() + mask2.sum()
        
        return 2 * intersection / sum_areas if sum_areas > 0 else 0.0

# Example usage
polygon_points = [(100, 100), (200, 100), (200, 200), (100, 200), (100, 100)]
seg_mask = SegmentationMask(polygon_points=polygon_points, label='person')

# Convert to different formats
binary_mask = seg_mask.to_mask(height=300, width=300)
rle = seg_mask.to_rle(height=300, width=300)

print(f"Mask shape: {binary_mask.shape}")
print(f"Mask area: {seg_mask.area()}")
print(f"RLE counts (first 10): {rle['counts'][:10]}")

17.3 Video Annotation

17.3.1 Challenges Unique to Video

Table 17.3: Video annotation complexity factors
Code
import pandas as pd

video_challenges = pd.DataFrame({
    'Challenge': [
        'Temporal Consistency',
        'Occlusion Handling',
        'Object Re-identification',
        'Motion Blur',
        'Scale Variation',
        'Annotation Volume',
        'Keyframe Selection',
        'Interpolation Accuracy'
    ],
    'Impact on Quality': [
        'High - ID switches common',
        'High - Lost tracks',
        'High - Same object different IDs',
        'Medium - Boundary uncertainty',
        'Medium - Small object detection',
        'High - Prohibitive manual effort',
        'Medium - Miss important frames',
        'Medium - Drift between keyframes'
    ],
    'Mitigation Strategy': [
        'Track review UI with temporal context',
        'Flag occlusion states explicitly',
        'Visual similarity matching tools',
        'Multiple frame context',
        'Consistent zoom level',
        'Keyframe + interpolation workflow',
        'Scene change detection',
        'Optical flow-based interpolation'
    ],
    'Cost Impact': [
        '3-5x vs single frame',
        '1.5x (review time)',
        '2x (manual correction)',
        '1.2x (slower annotation)',
        '1.3x (zoom overhead)',
        '10-30x (30 fps video)',
        '0.5x (reduces frames)',
        '0.3x (auto-fills frames)'
    ]
})

video_challenges

17.3.2 Object Tracking Data Structure

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from enum import Enum

class ObjectState(Enum):
    """Object visibility states"""
    VISIBLE = "visible"
    OCCLUDED = "occluded"
    OUT_OF_FRAME = "out_of_frame"
    UNCERTAIN = "uncertain"

@dataclass
class TrackedObject:
    """
    Single object tracked across multiple frames
    """
    track_id: int
    label: str
    frames: Dict[int, dict] = field(default_factory=dict)
    
    def add_detection(self, frame_num: int, bbox: BoundingBox, 
                     state: ObjectState = ObjectState.VISIBLE,
                     keyframe: bool = False):
        """
        Add detection at specific frame
        
        Args:
            frame_num: Frame number
            bbox: Bounding box at this frame
            state: Visibility state
            keyframe: Is this a manually annotated keyframe?
        """
        self.frames[frame_num] = {
            'bbox': bbox,
            'state': state,
            'keyframe': keyframe
        }
    
    def interpolate_frames(self, start_frame: int, end_frame: int,
                          method: str = 'linear'):
        """
        Interpolate bounding boxes between keyframes
        
        Args:
            start_frame: Start frame (must be annotated)
            end_frame: End frame (must be annotated)
            method: 'linear', 'cubic', or 'optical_flow'
        """
        if start_frame not in self.frames or end_frame not in self.frames:
            raise ValueError("Both start and end frames must be annotated")
        
        if method == 'linear':
            self._linear_interpolation(start_frame, end_frame)
        elif method == 'cubic':
            self._cubic_interpolation(start_frame, end_frame)
        else:
            raise ValueError(f"Unknown interpolation method: {method}")
    
    def _linear_interpolation(self, start_frame: int, end_frame: int):
        """Linear interpolation of bounding boxes"""
        start_bbox = self.frames[start_frame]['bbox']
        end_bbox = self.frames[end_frame]['bbox']
        
        num_frames = end_frame - start_frame
        
        for i in range(1, num_frames):
            frame_num = start_frame + i
            alpha = i / num_frames
            
            # Interpolate each coordinate
            x = start_bbox.x + alpha * (end_bbox.x - start_bbox.x)
            y = start_bbox.y + alpha * (end_bbox.y - start_bbox.y)
            w = start_bbox.width + alpha * (end_bbox.width - start_bbox.width)
            h = start_bbox.height + alpha * (end_bbox.height - start_bbox.height)
            
            interpolated_bbox = BoundingBox(
                x=x, y=y, width=w, height=h, label=self.label
            )
            
            self.add_detection(
                frame_num, 
                interpolated_bbox,
                state=ObjectState.VISIBLE,
                keyframe=False
            )
    
    def get_trajectory(self) -> List[Tuple[int, BoundingBox]]:
        """Get sorted list of (frame, bbox) tuples"""
        return sorted(
            [(f, d['bbox']) for f, d in self.frames.items()],
            key=lambda x: x[0]
        )
    
    def coverage(self, total_frames: int) -> float:
        """Calculate what % of frames have annotations"""
        return len(self.frames) / total_frames

@dataclass
class VideoAnnotation:
    """
    Complete annotation for a video
    """
    video_id: str
    fps: float
    total_frames: int
    width: int
    height: int
    tracks: Dict[int, TrackedObject] = field(default_factory=dict)
    _next_track_id: int = field(default=0, init=False)
    
    def create_track(self, label: str, first_frame: int, 
                    bbox: BoundingBox) -> int:
        """
        Create new tracked object
        
        Returns:
            track_id of created track
        """
        track_id = self._next_track_id
        self._next_track_id += 1
        
        track = TrackedObject(track_id=track_id, label=label)
        track.add_detection(first_frame, bbox, keyframe=True)
        
        self.tracks[track_id] = track
        return track_id
    
    def get_frame_annotations(self, frame_num: int) -> List[Tuple[int, BoundingBox]]:
        """Get all bounding boxes for a specific frame"""
        annotations = []
        for track_id, track in self.tracks.items():
            if frame_num in track.frames:
                annotations.append((
                    track_id, 
                    track.frames[frame_num]['bbox']
                ))
        return annotations
    
    def export_to_mot_format(self, output_path: str):
        """
        Export to MOT Challenge format
        
        Format: <frame>, <id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <conf>, <x>, <y>, <z>
        """
        with open(output_path, 'w') as f:
            for track_id, track in self.tracks.items():
                for frame_num in sorted(track.frames.keys()):
                    bbox = track.frames[frame_num]['bbox']
                    # MOT format uses 1-indexed frames
                    f.write(f"{frame_num + 1},{track_id},{bbox.x},{bbox.y},"
                           f"{bbox.width},{bbox.height},1,-1,-1,-1\n")

# Example usage
video_ann = VideoAnnotation(
    video_id="traffic_001",
    fps=30.0,
    total_frames=900,  # 30 seconds
    width=1920,
    height=1080
)

# Create track for a car
car_track_id = video_ann.create_track(
    label="car",
    first_frame=0,
    bbox=BoundingBox(100, 200, 150, 100, "car")
)

# Add keyframe at frame 30
video_ann.tracks[car_track_id].add_detection(
    frame_num=30,
    bbox=BoundingBox(250, 220, 160, 110, "car"),
    keyframe=True
)

# Interpolate frames 0-30
video_ann.tracks[car_track_id].interpolate_frames(0, 30, method='linear')

# Get all annotations at frame 15
frame_15_anns = video_ann.get_frame_annotations(15)
print(f"Frame 15 has {len(frame_15_anns)} objects")
print(f"Car position at frame 15: {frame_15_anns[0][1].to_xywh()}")

17.3.3 Video Annotation Workflow

flowchart TD
    A[Load Video] --> B[Automatic Scene Detection]
    B --> C[Present Keyframes to Annotator]
    C --> D[Annotator Labels Objects in Keyframes]
    D --> E{Object Exits/Enters Scene?}
    E -->|Yes| F[Mark Entry/Exit Frames]
    E -->|No| G[Continue to Next Keyframe]
    F --> G
    G --> H[Automatic Interpolation]
    H --> I[Annotator Reviews Interpolated Frames]
    I --> J{Quality OK?}
    J -->|No| K[Add Correction Keyframe]
    K --> H
    J -->|Yes| L[Mark Track as Complete]
    L --> M{More Objects?}
    M -->|Yes| D
    M -->|No| N[Export Annotations]
Figure 17.2: Keyframe-based video annotation workflow

17.4 Audio Annotation

17.4.1 Task Taxonomy

Table 17.4: Audio annotation task types
Code
import pandas as pd

audio_tasks = pd.DataFrame({
    'Task': [
        'Speech Transcription',
        'Speaker Diarization',
        'Emotion Recognition',
        'Intent Classification',
        'Sound Event Detection',
        'Music Tagging',
        'Audio Quality Assessment',
        'Wake Word Detection'
    ],
    'Granularity': [
        'Utterance',
        'Segment',
        'Utterance',
        'Utterance',
        'Event',
        'Track',
        'Track',
        'Segment'
    ],
    'Complexity': [3, 4, 3, 2, 3, 2, 2, 2],
    'Avg Time ($/min audio)': [1.50, 2.00, 0.50, 0.30, 1.00, 0.40, 0.30, 0.40],
    'Special Requirements': [
        'Domain vocabulary',
        'Multi-speaker handling',
        'Cultural context',
        'Domain knowledge',
        'Sound taxonomy',
        'Music knowledge',
        'Audio expertise',
        'Precision timing'
    ]
})

audio_tasks

17.4.2 Transcription Data Structure

from dataclasses import dataclass
from typing import List, Optional
import re

@dataclass
class TranscriptionSegment:
    """
    Single segment of transcribed audio
    """
    start_time: float  # seconds
    end_time: float    # seconds
    text: str
    speaker_id: Optional[str] = None
    confidence: float = 1.0
    
    def duration(self) -> float:
        """Segment duration in seconds"""
        return self.end_time - self.start_time
    
    def word_count(self) -> int:
        """Count words in segment"""
        return len(self.text.split())
    
    def speaking_rate(self) -> float:
        """Words per minute"""
        if self.duration() == 0:
            return 0
        return (self.word_count() / self.duration()) * 60

@dataclass  
class AudioTranscription:
    """
    Complete transcription of audio file
    """
    audio_id: str
    duration: float  # total audio duration
    segments: List[TranscriptionSegment]
    
    def total_words(self) -> int:
        """Total word count"""
        return sum(seg.word_count() for seg in self.segments)
    
    def average_speaking_rate(self) -> float:
        """Average words per minute across all segments"""
        total_duration = sum(seg.duration() for seg in self.segments)
        if total_duration == 0:
            return 0
        return (self.total_words() / total_duration) * 60
    
    def get_speaker_segments(self, speaker_id: str) -> List[TranscriptionSegment]:
        """Get all segments for a specific speaker"""
        return [seg for seg in self.segments if seg.speaker_id == speaker_id]
    
    def to_srt(self) -> str:
        """
        Export to SRT subtitle format
        
        Format:
        1
        00:00:00,000 --> 00:00:02,500
        First subtitle text
        
        2
        00:00:02,500 --> 00:00:05,000
        Second subtitle text
        """
        def format_timestamp(seconds: float) -> str:
            hours = int(seconds // 3600)
            minutes = int((seconds % 3600) // 60)
            secs = int(seconds % 60)
            millis = int((seconds % 1) * 1000)
            return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
        
        srt_output = []
        for i, seg in enumerate(self.segments, 1):
            srt_output.append(f"{i}")
            srt_output.append(
                f"{format_timestamp(seg.start_time)} --> "
                f"{format_timestamp(seg.end_time)}"
            )
            if seg.speaker_id:
                srt_output.append(f"[{seg.speaker_id}] {seg.text}")
            else:
                srt_output.append(seg.text)
            srt_output.append("")  # Blank line between entries
        
        return "\n".join(srt_output)
    
    def to_vtt(self) -> str:
        """Export to WebVTT format"""
        def format_timestamp(seconds: float) -> str:
            hours = int(seconds // 3600)
            minutes = int((seconds % 3600) // 60)
            secs = seconds % 60
            return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"
        
        vtt_output = ["WEBVTT", ""]
        
        for seg in self.segments:
            vtt_output.append(
                f"{format_timestamp(seg.start_time)} --> "
                f"{format_timestamp(seg.end_time)}"
            )
            if seg.speaker_id:
                vtt_output.append(f"<v {seg.speaker_id}>{seg.text}</v>")
            else:
                vtt_output.append(seg.text)
            vtt_output.append("")
        
        return "\n".join(vtt_output)

# Example usage
transcription = AudioTranscription(
    audio_id="interview_001",
    duration=180.0,  # 3 minutes
    segments=[
        TranscriptionSegment(
            start_time=0.0,
            end_time=3.5,
            text="Hello, thank you for joining us today.",
            speaker_id="SPEAKER_1"
        ),
        TranscriptionSegment(
            start_time=3.8,
            end_time=6.2,
            text="Thanks for having me.",
            speaker_id="SPEAKER_2"
        ),
        TranscriptionSegment(
            start_time=6.5,
            end_time=12.1,
            text="Let's start with your background in machine learning.",
            speaker_id="SPEAKER_1"
        )
    ]
)

print(f"Total words: {transcription.total_words()}")
print(f"Average speaking rate: {transcription.average_speaking_rate():.1f} WPM")
print("\nSRT format:")
print(transcription.to_srt()[:200] + "...")

17.4.3 Transcription Quality Metrics

import Levenshtein
from typing import List, Tuple

class TranscriptionQualityMetrics:
    """
    Calculate quality metrics for transcriptions
    """
    
    @staticmethod
    def word_error_rate(reference: str, hypothesis: str) -> float:
        """
        Calculate Word Error Rate (WER)
        
        WER = (Substitutions + Deletions + Insertions) / Total Words in Reference
        
        Standard metric for ASR evaluation
        """
        ref_words = reference.lower().split()
        hyp_words = hypothesis.lower().split()
        
        # Levenshtein distance at word level
        distance = Levenshtein.distance(ref_words, hyp_words)
        
        if len(ref_words) == 0:
            return 0.0 if len(hyp_words) == 0 else float('inf')
        
        return distance / len(ref_words)
    
    @staticmethod
    def character_error_rate(reference: str, hypothesis: str) -> float:
        """
        Calculate Character Error Rate (CER)
        
        More fine-grained than WER
        """
        distance = Levenshtein.distance(reference.lower(), hypothesis.lower())
        
        if len(reference) == 0:
            return 0.0 if len(hypothesis) == 0 else float('inf')
        
        return distance / len(reference)
    
    @staticmethod
    def calculate_agreement(
        transcriptions: List[str],
        use_wer: bool = True
    ) -> dict:
        """
        Calculate inter-annotator agreement for transcriptions
        
        Args:
            transcriptions: List of transcription strings
            use_wer: Use WER (True) or CER (False)
        
        Returns:
            dict with mean agreement, pairwise agreements
        """
        if len(transcriptions) < 2:
            return {'mean_agreement': 1.0, 'pairwise': []}
        
        metric_func = (TranscriptionQualityMetrics.word_error_rate if use_wer 
                      else TranscriptionQualityMetrics.character_error_rate)
        
        pairwise_errors = []
        n = len(transcriptions)
        
        for i in range(n):
            for j in range(i + 1, n):
                error_rate = metric_func(transcriptions[i], transcriptions[j])
                agreement = 1 - error_rate  # Convert error to agreement
                pairwise_errors.append({
                    'transcriber_1': i,
                    'transcriber_2': j,
                    'agreement': max(0, agreement),  # Clamp to [0, 1]
                    'error_rate': error_rate
                })
        
        mean_agreement = np.mean([p['agreement'] for p in pairwise_errors])
        
        return {
            'mean_agreement': mean_agreement,
            'pairwise': pairwise_errors,
            'metric': 'WER' if use_wer else 'CER'
        }
    
    @staticmethod
    def calculate_majority_vote(transcriptions: List[str]) -> str:
        """
        Find consensus transcription using edit distance
        
        Returns transcription with minimum total distance to all others
        """
        if not transcriptions:
            return ""
        
        if len(transcriptions) == 1:
            return transcriptions[0]
        
        min_total_distance = float('inf')
        consensus = transcriptions[0]
        
        for candidate in transcriptions:
            total_distance = sum(
                Levenshtein.distance(candidate.lower(), other.lower())
                for other in transcriptions if other != candidate
            )
            
            if total_distance < min_total_distance:
                min_total_distance = total_distance
                consensus = candidate
        
        return consensus

# Example usage
reference = "The quick brown fox jumps over the lazy dog"
hypothesis1 = "The quick brown fox jumped over the lazy dog"
hypothesis2 = "The quick brown fox jumps over a lazy dog"
hypothesis3 = "The quick braun fox jumps over the lazy dog"

# Calculate WER
wer1 = TranscriptionQualityMetrics.word_error_rate(reference, hypothesis1)
wer2 = TranscriptionQualityMetrics.word_error_rate(reference, hypothesis2)
wer3 = TranscriptionQualityMetrics.word_error_rate(reference, hypothesis3)

print(f"WER (hypothesis 1): {wer1:.3f}")
print(f"WER (hypothesis 2): {wer2:.3f}")
print(f"WER (hypothesis 3): {wer3:.3f}")

# Calculate inter-annotator agreement
transcriptions = [reference, hypothesis1, hypothesis2, hypothesis3]
agreement = TranscriptionQualityMetrics.calculate_agreement(transcriptions)
print(f"\nMean agreement: {agreement['mean_agreement']:.3f}")

# Find consensus
consensus = TranscriptionQualityMetrics.calculate_majority_vote(transcriptions)
print(f"\nConsensus transcription: {consensus}")

18 Quality Assurance & Metrics

18.1 Inter-Rater Reliability

18.1.1 Overview of IRR Metrics

Table 18.1: Inter-rater reliability metrics comparison
Code
import pandas as pd

irr_metrics = pd.DataFrame({
    'Metric': [
        'Percent Agreement',
        'Cohen\'s Kappa',
        'Fleiss\' Kappa',
        'Krippendorff\'s Alpha',
        'ICC (Intraclass Correlation)',
        'F1 Score',
        'Dice Coefficient',
        'IoU (Jaccard Index)'
    ],
    'Data Type': [
        'Categorical',
        'Categorical',
        'Categorical',
        'Any scale',
        'Continuous',
        'Binary/Multi-class',
        'Binary (segmentation)',
        'Binary (detection)'
    ],
    'Raters': [
        '≥2',
        '2',
        '≥2',
        '≥2',
        '≥2',
        '2',
        '2',
        '2'
    ],
    'Accounts for Chance': [
        'No',
        'Yes',
        'Yes',
        'Yes',
        'Partial',
        'No',
        'No',
        'No'
    ],
    'Missing Data': [
        'No',
        'No',
        'No',
        'Yes',
        'Yes (some variants)',
        'No',
        'No',
        'No'
    ],
    'Use Case': [
        'Quick check (not recommended)',
        'Traditional research',
        'Traditional research (>2 raters)',
        'Research (most flexible)',
        'Continuous ratings',
        'ML evaluation',
        'Segmentation evaluation',
        'Detection evaluation'
    ]
})

irr_metrics

18.1.2 Implementation: Traditional Metrics

import numpy as np
from typing import List, Union
from scipy.stats import kendalltau
import pandas as pd

class InterRaterReliability:
    """
    Calculate various inter-rater reliability metrics
    """
    
    @staticmethod
    def percent_agreement(rater1: np.ndarray, rater2: np.ndarray) -> float:
        """
        Simple percent agreement (not recommended - doesn't account for chance)
        
        Args:
            rater1: Array of ratings from rater 1
            rater2: Array of ratings from rater 2
            
        Returns:
            Proportion of agreement (0 to 1)
        """
        if len(rater1) != len(rater2):
            raise ValueError("Rater arrays must be same length")
        
        return np.mean(rater1 == rater2)
    
    @staticmethod
    def cohens_kappa(rater1: np.ndarray, rater2: np.ndarray) -> float:
        """
        Cohen's Kappa for two raters
        
        κ = (p_o - p_e) / (1 - p_e)
        
        where:
        - p_o = observed agreement
        - p_e = expected agreement by chance
        
        Interpretation:
        - < 0: Less than chance agreement
        - 0.01-0.20: Slight agreement
        - 0.21-0.40: Fair agreement
        - 0.41-0.60: Moderate agreement
        - 0.61-0.80: Substantial agreement
        - 0.81-1.00: Almost perfect agreement
        """
        if len(rater1) != len(rater2):
            raise ValueError("Rater arrays must be same length")
        
        # Observed agreement
        p_o = np.mean(rater1 == rater2)
        
        # Expected agreement
        categories = np.unique(np.concatenate([rater1, rater2]))
        p_e = 0
        
        for category in categories:
            p1 = np.mean(rater1 == category)
            p2 = np.mean(rater2 == category)
            p_e += p1 * p2
        
        # Cohen's kappa
        if p_e == 1:
            return 1.0
        
        kappa = (p_o - p_e) / (1 - p_e)
        return kappa
    
    @staticmethod
    def fleiss_kappa(ratings: np.ndarray) -> float:
        """
        Fleiss' Kappa for multiple raters
        
        Args:
            ratings: 2D array of shape (n_items, n_raters)
            
        Returns:
            Fleiss' kappa coefficient
        """
        n_items, n_raters = ratings.shape
        
        # Get unique categories
        categories = np.unique(ratings)
        n_categories = len(categories)
        
        # Count category assignments per item
        category_counts = np.zeros((n_items, n_categories))
        
        for i, category in enumerate(categories):
            category_counts[:, i] = np.sum(ratings == category, axis=1)
        
        # Calculate P_i (extent of agreement for item i)
        P_i = (np.sum(category_counts ** 2, axis=1) - n_raters) / (n_raters * (n_raters - 1))
        
        # Mean observed agreement
        P_bar = np.mean(P_i)
        
        # Expected agreement
        p_j = np.sum(category_counts, axis=0) / (n_items * n_raters)
        P_e_bar = np.sum(p_j ** 2)
        
        # Fleiss' kappa
        if P_e_bar == 1:
            return 1.0
        
        kappa = (P_bar - P_e_bar) / (1 - P_e_bar)
        return kappa
    
    @staticmethod
    def krippendorff_alpha(ratings: np.ndarray, level_of_measurement: str = 'nominal') -> float:
        """
        Krippendorff's Alpha - most flexible reliability measure
        
        Handles:
        - Missing data
        - Any number of raters
        - Different levels of measurement
        
        Args:
            ratings: 2D array of shape (n_raters, n_items)
                    Use np.nan for missing values
            level_of_measurement: 'nominal', 'ordinal', 'interval', or 'ratio'
            
        Returns:
            Alpha coefficient
        """
        n_raters, n_items = ratings.shape
        
        # Define distance function based on level of measurement
        if level_of_measurement == 'nominal':
            def distance(c, k):
                return 0 if c == k else 1
        elif level_of_measurement == 'ordinal':
            def distance(c, k):
                # For ordinal data, distance is based on cumulative proportions
                return (c - k) ** 2
        elif level_of_measurement in ['interval', 'ratio']:
            def distance(c, k):
                return (c - k) ** 2
        else:
            raise ValueError(f"Unknown level of measurement: {level_of_measurement}")
        
        # Create coincidence matrix
        categories = np.unique(ratings[~np.isnan(ratings)])
        n_c = len(categories)
        
        coincidence_matrix = np.zeros((n_c, n_c))
        
        # Count coincidences
        for item_idx in range(n_items):
            item_ratings = ratings[:, item_idx]
            item_ratings = item_ratings[~np.isnan(item_ratings)]
            
            m_u = len(item_ratings)
            if m_u < 2:
                continue
            
            for c_idx, c in enumerate(categories):
                for k_idx, k in enumerate(categories):
                    n_ck = np.sum((item_ratings == c)[:, None] * (item_ratings == k))
                    if c == k:
                        n_ck -= np.sum(item_ratings == c)
                    coincidence_matrix[c_idx, k_idx] += n_ck / (m_u - 1)
        
        # Calculate observed and expected disagreement
        n_values = np.sum(coincidence_matrix)
        
        if n_values == 0:
            return np.nan
        
        D_o = 0
        D_e = 0
        
        for c_idx, c in enumerate(categories):
            for k_idx, k in enumerate(categories):
                if c_idx != k_idx:
                    d = distance(c, k)
                    D_o += coincidence_matrix[c_idx, k_idx] * d
                    
                    n_c = np.sum(coincidence_matrix[c_idx, :])
                    n_k = np.sum(coincidence_matrix[:, k_idx])
                    D_e += n_c * n_k * d
        
        D_o /= n_values
        D_e /= (n_values * (n_values - 1))
        
        # Krippendorff's alpha
        if D_e == 0:
            return 1.0
        
        alpha = 1 - (D_o / D_e)
        return alpha
    
    @staticmethod
    def icc(ratings: np.ndarray, icc_type: str = 'ICC(2,1)') -> float:
        """
        Intraclass Correlation Coefficient
        
        Args:
            ratings: 2D array of shape (n_subjects, n_raters)
            icc_type: Type of ICC
                - ICC(1,1): Each subject rated by different raters
                - ICC(2,1): Random sample of raters
                - ICC(3,1): Fixed set of raters
                
        Returns:
            ICC value
        """
        n_subjects, n_raters = ratings.shape
        
        # Calculate mean squares
        mean_ratings = np.mean(ratings, axis=1)
        grand_mean = np.mean(ratings)
        
        # Between-subjects mean square
        MS_between = n_raters * np.sum((mean_ratings - grand_mean) ** 2) / (n_subjects - 1)
        
        # Within-subjects mean square
        MS_within = np.sum((ratings - mean_ratings[:, None]) ** 2) / (n_subjects * (n_raters - 1))
        
        # Mean square for raters
        mean_per_rater = np.mean(ratings, axis=0)
        MS_raters = n_subjects * np.sum((mean_per_rater - grand_mean) ** 2) / (n_raters - 1)
        
        # Mean square error
        MS_error = (n_subjects * (n_raters - 1) * MS_within - (n_subjects - 1) * (MS_raters - MS_within)) / ((n_subjects - 1) * (n_raters - 1))
        
        if icc_type == 'ICC(1,1)':
            icc = (MS_between - MS_within) / (MS_between + (n_raters - 1) * MS_within)
        elif icc_type == 'ICC(2,1)':
            icc = (MS_between - MS_error) / (MS_between + (n_raters - 1) * MS_error + n_raters * (MS_raters - MS_error) / n_subjects)
        elif icc_type == 'ICC(3,1)':
            icc = (MS_between - MS_error) / (MS_between + (n_raters - 1) * MS_error)
        else:
            raise ValueError(f"Unknown ICC type: {icc_type}")
        
        return icc

# Example usage with synthetic data
np.random.seed(42)

# Binary classification task - 100 examples, 3 raters
n_examples = 100
n_raters = 3

# Generate ratings with some agreement
base_truth = np.random.randint(0, 2, n_examples)
ratings_matrix = np.zeros((n_raters, n_examples), dtype=int)

for i in range(n_raters):
    # Each rater agrees with truth 80% of the time
    ratings_matrix[i] = base_truth.copy()
    flip_mask = np.random.random(n_examples) > 0.8
    ratings_matrix[i, flip_mask] = 1 - ratings_matrix[i, flip_mask]

# Calculate metrics
irr = InterRaterReliability()

# Pairwise Cohen's kappa
kappa_01 = irr.cohens_kappa(ratings_matrix[0], ratings_matrix[1])
kappa_02 = irr.cohens_kappa(ratings_matrix[0], ratings_matrix[2])
kappa_12 = irr.cohens_kappa(ratings_matrix[1], ratings_matrix[2])

print("Cohen's Kappa (pairwise):")
print(f"  Rater 0-1: {kappa_01:.3f}")
print(f"  Rater 0-2: {kappa_02:.3f}")
print(f"  Rater 1-2: {kappa_12:.3f}")

# Fleiss' kappa
fleiss = irr.fleiss_kappa(ratings_matrix.T)
print(f"\nFleiss' Kappa: {fleiss:.3f}")

# Krippendorff's alpha
alpha = irr.krippendorff_alpha(ratings_matrix, level_of_measurement='nominal')
print(f"Krippendorff's Alpha: {alpha:.3f}")

18.1.3 ML-Specific Agreement Metrics

from sklearn.metrics import confusion_matrix, precision_recall_fscore_support
import matplotlib.pyplot as plt
import seaborn as sns

class MLAnnotationMetrics:
    """
    Metrics specific to ML annotation quality
    """
    
    @staticmethod
    def multi_annotator_confusion_matrix(
        gold_labels: np.ndarray,
        annotator_labels: List[np.ndarray],
        labels: List[str]
    ) -> dict:
        """
        Create confusion matrices for each annotator vs gold standard
        
        Args:
            gold_labels: Ground truth labels
            annotator_labels: List of label arrays, one per annotator
            labels: List of label names
            
        Returns:
            Dict with confusion matrices and metrics per annotator
        """
        results = {}
        
        for i, ann_labels in enumerate(annotator_labels):
            cm = confusion_matrix(gold_labels, ann_labels, labels=range(len(labels)))
            
            precision, recall, f1, support = precision_recall_fscore_support(
                gold_labels, ann_labels, average='macro', zero_division=0
            )
            
            results[f'annotator_{i}'] = {
                'confusion_matrix': cm,
                'precision': precision,
                'recall': recall,
                'f1': f1,
                'accuracy': np.mean(gold_labels == ann_labels)
            }
        
        return results
    
    @staticmethod
    def entropy_disagreement(annotations: np.ndarray) -> np.ndarray:
        """
        Calculate entropy of annotations per example
        
        High entropy = high disagreement = difficult example
        
        Args:
            annotations: 2D array (n_examples, n_annotators)
            
        Returns:
            Array of entropy values per example
        """
        n_examples, n_annotators = annotations.shape
        entropies = np.zeros(n_examples)
        
        for i in range(n_examples):
            labels, counts = np.unique(annotations[i], return_counts=True)
            probs = counts / n_annotators
            entropy = -np.sum(probs * np.log2(probs + 1e-10))
            entropies[i] = entropy
        
        return entropies
    
    @staticmethod
    def identify_difficult_examples(
        annotations: np.ndarray,
        threshold: float = 0.5,
        min_annotators: int = 3
    ) -> np.ndarray:
        """
        Identify examples with high disagreement
        
        Args:
            annotations: 2D array (n_examples, n_annotators)
            threshold: Entropy threshold for "difficult"
            min_annotators: Minimum annotators required
            
        Returns:
            Boolean array indicating difficult examples
        """
        if annotations.shape[1] < min_annotators:
            raise ValueError(f"Need at least {min_annotators} annotators")
        
        entropies = MLAnnotationMetrics.entropy_disagreement(annotations)
        max_possible_entropy = np.log2(annotations.shape[1])
        normalized_entropy = entropies / max_possible_entropy
        
        return normalized_entropy > threshold
    
    @staticmethod
    def annotator_skill_estimation(
        annotations: np.ndarray,
        gold_labels: np.ndarray = None
    ) -> pd.DataFrame:
        """
        Estimate annotator skill levels
        
        If gold labels available: direct accuracy comparison
        If not: use peer agreement as proxy
        
        Args:
            annotations: 2D array (n_examples, n_annotators)
            gold_labels: Optional ground truth labels
            
        Returns:
            DataFrame with annotator metrics
        """
        n_examples, n_annotators = annotations.shape
        
        metrics = []
        
        for i in range(n_annotators):
            annotator_labels = annotations[:, i]
            
            if gold_labels is not None:
                # Direct accuracy
                accuracy = np.mean(annotator_labels == gold_labels)
                metric_name = 'accuracy_vs_gold'
            else:
                # Agreement with majority vote
                majority_vote = []
                for j in range(n_examples):
                    other_labels = np.concatenate([
                        annotations[j, :i],
                        annotations[j, i+1:]
                    ])
                    # Most common label (excluding this annotator)
                    values, counts = np.unique(other_labels, return_counts=True)
                    majority_vote.append(values[np.argmax(counts)])
                
                majority_vote = np.array(majority_vote)
                accuracy = np.mean(annotator_labels == majority_vote)
                metric_name = 'peer_agreement'
            
            # Consistency (self-agreement on duplicated examples would go here)
            
            metrics.append({
                'annotator_id': i,
                metric_name: accuracy,
                'total_annotations': n_examples
            })
        
        return pd.DataFrame(metrics)

# Example usage
np.random.seed(42)

# Simulate annotation scenario
n_examples = 200
n_annotators = 5
n_classes = 3

# Generate "true" labels with varying difficulty
difficulty = np.random.beta(2, 5, n_examples)  # Most examples are "easy"
true_labels = np.random.randint(0, n_classes, n_examples)

# Generate annotations based on difficulty
annotations = np.zeros((n_examples, n_annotators), dtype=int)

for i in range(n_examples):
    # Harder examples have more disagreement
    error_rate = difficulty[i] * 0.5  # Up to 50% error on hardest examples
    
    for j in range(n_annotators):
        if np.random.random() < error_rate:
            # Make error
            wrong_labels = [l for l in range(n_classes) if l != true_labels[i]]
            annotations[i, j] = np.random.choice(wrong_labels)
        else:
            annotations[i, j] = true_labels[i]

# Calculate metrics
ml_metrics = MLAnnotationMetrics()

# Identify difficult examples
difficult = ml_metrics.identify_difficult_examples(annotations, threshold=0.3)
print(f"Difficult examples: {np.sum(difficult)} / {n_examples} ({np.sum(difficult)/n_examples*100:.1f}%)")

# Estimate annotator skills
annotator_skills = ml_metrics.annotator_skill_estimation(annotations, true_labels)
print("\nAnnotator Skills:")
print(annotator_skills)

# Calculate entropy for all examples
entropies = ml_metrics.entropy_disagreement(annotations)

# Plot entropy distribution
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.hist(entropies, bins=30, edgecolor='black', alpha=0.7)
ax.axvline(np.mean(entropies), color='red', linestyle='--', label=f'Mean: {np.mean(entropies):.3f}')
ax.set_xlabel('Entropy (Disagreement)')
ax.set_ylabel('Number of Examples')
ax.set_title('Distribution of Annotation Disagreement')
ax.legend()
plt.tight_layout()
plt.savefig('entropy_distribution.png', dpi=150, bbox_inches='tight')
plt.close()

print("\nEntropy statistics:")
print(f"  Mean: {np.mean(entropies):.3f}")
print(f"  Std: {np.std(entropies):.3f}")
print(f"  Min: {np.min(entropies):.3f}")
print(f"  Max: {np.max(entropies):.3f}")

18.2 Dawid-Skene Model

18.2.1 Theory

The Dawid-Skene model is a probabilistic approach to aggregate labels from multiple annotators, accounting for varying annotator quality.

Model components:

  1. True labels (\(T_i\)): Unknown ground truth for item \(i\)
  2. Observed labels (\(L_{ij}\)): Label from annotator \(j\) for item \(i\)
  3. Prior (\(\pi_k\)): Probability that true label is class \(k\)
  4. Error rates (\(\theta_{jkl}\)): Probability annotator \(j\) labels class \(k\) as \(l\)

Likelihood:

\[P(L_{ij} = l | T_i = k, \theta_j) = \theta_{jkl}\]

Estimation via EM:

E-step: Estimate posterior probability of true label

\[P(T_i = k | L_i, \theta) \propto \pi_k \prod_{j=1}^{J} \theta_{jk,L_{ij}}\]

M-step: Update parameters

\[\pi_k = \frac{1}{I}\sum_{i=1}^{I} P(T_i = k | L_i, \theta)\]

\[\theta_{jkl} = \frac{\sum_{i=1}^{I} P(T_i = k | L_i, \theta) \mathbb{1}[L_{ij} = l]}{\sum_{i=1}^{I} P(T_i = k | L_i, \theta)}\]

18.2.2 Implementation

import numpy as np
from typing import Dict, List, Tuple
import pandas as pd

class DawidSkeneModel:
    """
    Dawid-Skene model for multi-annotator label aggregation
    
    Estimates:
    1. True labels for each example
    2. Error rates for each annotator
    3. Class priors
    """
    
    def __init__(self, n_classes: int, n_annotators: int):
        """
        Args:
            n_classes: Number of label classes
            n_annotators: Number of annotators
        """
        self.n_classes = n_classes
        self.n_annotators = n_annotators
        
        # Parameters to estimate
        self.class_priors = np.ones(n_classes) / n_classes
        self.error_rates = np.zeros((n_annotators, n_classes, n_classes))
        
        # Initialize error rates with diagonal dominance
        for j in range(n_annotators):
            self.error_rates[j] = np.eye(n_classes) * 0.7 + 0.3 / n_classes
        
        self.true_label_probs = None
    
    def fit(self, annotations: np.ndarray, max_iter: int = 100, 
            tol: float = 1e-4, verbose: bool = False) -> 'DawidSkeneModel':
        """
        Fit Dawid-Skene model using EM algorithm
        
        Args:
            annotations: 2D array (n_examples, n_annotators)
                        Use -1 for missing annotations
            max_iter: Maximum EM iterations
            tol: Convergence tolerance
            verbose: Print iteration info
            
        Returns:
            self
        """
        n_examples = annotations.shape[0]
        
        # Initialize true label probabilities
        self.true_label_probs = np.ones((n_examples, self.n_classes)) / self.n_classes
        
        prev_likelihood = -np.inf
        
        for iteration in range(max_iter):
            # E-step: Estimate true labels
            self._e_step(annotations)
            
            # M-step: Update parameters
            self._m_step(annotations)
            
            # Calculate likelihood
            likelihood = self._calculate_likelihood(annotations)
            
            if verbose and iteration % 10 == 0:
                print(f"Iteration {iteration}: Likelihood = {likelihood:.4f}")
            
            # Check convergence
            if abs(likelihood - prev_likelihood) < tol:
                if verbose:
                    print(f"Converged after {iteration} iterations")
                break
            
            prev_likelihood = likelihood
        
        return self
    
    def _e_step(self, annotations: np.ndarray):
        """E-step: Estimate posterior probabilities of true labels"""
        n_examples = annotations.shape[0]
        
        for i in range(n_examples):
            for k in range(self.n_classes):
                # Start with prior
                prob = self.class_priors[k]
                
                # Multiply by likelihood from each annotator
                for j in range(self.n_annotators):
                    label = annotations[i, j]
                    if label != -1:  # Skip missing annotations
                        prob *= self.error_rates[j, k, label]
                
                self.true_label_probs[i, k] = prob
            
            # Normalize
            self.true_label_probs[i] /= np.sum(self.true_label_probs[i])
    
    def _m_step(self, annotations: np.ndarray):
        """M-step: Update model parameters"""
        n_examples = annotations.shape[0]
        
        # Update class priors
        self.class_priors = np.mean(self.true_label_probs, axis=0)
        
        # Update error rates
        for j in range(self.n_annotators):
            for k in range(self.n_classes):
                for l in range(self.n_classes):
                    numerator = 0
                    denominator = 0
                    
                    for i in range(n_examples):
                        if annotations[i, j] != -1:
                            prob_true_k = self.true_label_probs[i, k]
                            denominator += prob_true_k
                            
                            if annotations[i, j] == l:
                                numerator += prob_true_k
                    
                    if denominator > 0:
                        self.error_rates[j, k, l] = numerator / denominator
                    else:
                        # Default to uniform if no data
                        self.error_rates[j, k, l] = 1.0 / self.n_classes
    
    def _calculate_likelihood(self, annotations: np.ndarray) -> float:
        """Calculate log-likelihood of current parameters"""
        n_examples = annotations.shape[0]
        log_likelihood = 0
        
        for i in range(n_examples):
            example_likelihood = 0
            
            for k in range(self.n_classes):
                prob = self.class_priors[k]
                
                for j in range(self.n_annotators):
                    label = annotations[i, j]
                    if label != -1:
                        prob *= self.error_rates[j, k, label]
                
                example_likelihood += prob
            
            log_likelihood += np.log(example_likelihood + 1e-10)
        
        return log_likelihood
    
    def predict(self) -> np.ndarray:
        """
        Get predicted true labels
        
        Returns:
            Array of predicted labels
        """
        return np.argmax(self.true_label_probs, axis=1)
    
    def predict_proba(self) -> np.ndarray:
        """
        Get probability distributions over true labels
        
        Returns:
            2D array (n_examples, n_classes)
        """
        return self.true_label_probs
    
    def get_annotator_quality(self) -> pd.DataFrame:
        """
        Calculate quality metrics for each annotator
        
        Returns:
            DataFrame with annotator quality metrics
        """
        metrics = []
        
        for j in range(self.n_annotators):
            # Accuracy (diagonal of error matrix)
            accuracy = np.mean(np.diag(self.error_rates[j]))
            
            # Worst-case accuracy (minimum diagonal element)
            min_accuracy = np.min(np.diag(self.error_rates[j]))
            
            # Confusion (off-diagonal mass)
            confusion = 1 - accuracy
            
            metrics.append({
                'annotator_id': j,
                'accuracy': accuracy,
                'min_class_accuracy': min_accuracy,
                'confusion_rate': confusion
            })
        
        return pd.DataFrame(metrics)
    
    def visualize_error_matrices(self, annotator_ids: List[int] = None, 
                                 class_names: List[str] = None):
        """
        Visualize error matrices for annotators
        
        Args:
            annotator_ids: Which annotators to plot (default: all)
            class_names: Names for classes (default: 0, 1, 2, ...)
        """
        import matplotlib.pyplot as plt
        import seaborn as sns
        
        if annotator_ids is None:
            annotator_ids = range(self.n_annotators)
        
        if class_names is None:
            class_names = [str(i) for i in range(self.n_classes)]
        
        n_plots = len(annotator_ids)
        fig, axes = plt.subplots(1, n_plots, figsize=(5 * n_plots, 4))
        
        if n_plots == 1:
            axes = [axes]
        
        for idx, annotator_id in enumerate(annotator_ids):
            sns.heatmap(
                self.error_rates[annotator_id],
                annot=True,
                fmt='.3f',
                cmap='YlOrRd',
                xticklabels=class_names,
                yticklabels=class_names,
                ax=axes[idx],
                cbar_kws={'label': 'Probability'}
            )
            axes[idx].set_title(f'Annotator {annotator_id} Error Matrix')
            axes[idx].set_xlabel('Observed Label')
            axes[idx].set_ylabel('True Label')
        
        plt.tight_layout()
        plt.savefig('dawid_skene_error_matrices.png', dpi=150, bbox_inches='tight')
        plt.close()

# Example usage with synthetic data
np.random.seed(42)

# Create synthetic annotation scenario
n_examples = 500
n_annotators = 5
n_classes = 3

# Generate true labels
true_labels = np.random.randint(0, n_classes, n_examples)

# Create annotations with varying quality annotators
annotations = np.full((n_examples, n_annotators), -1, dtype=int)

# Annotator quality levels
annotator_accuracy = [0.95, 0.85, 0.75, 0.65, 0.55]

for j in range(n_annotators):
    # Each annotator sees 80% of examples
    annotated_mask = np.random.random(n_examples) < 0.8
    
    for i in range(n_examples):
        if annotated_mask[i]:
            if np.random.random() < annotator_accuracy[j]:
                # Correct label
                annotations[i, j] = true_labels[i]
            else:
                # Random error
                wrong_labels = [l for l in range(n_classes) if l != true_labels[i]]
                annotations[i, j] = np.random.choice(wrong_labels)

# Fit Dawid-Skene model
ds_model = DawidSkeneModel(n_classes=n_classes, n_annotators=n_annotators)
ds_model.fit(annotations, verbose=True)

# Get predictions
predicted_labels = ds_model.predict()

# Calculate accuracy vs true labels
accuracy = np.mean(predicted_labels == true_labels)
print(f"\nDawid-Skene Accuracy: {accuracy:.3f}")

# Compare to majority vote
def majority_vote(annotations):
    predictions = []
    for i in range(annotations.shape[0]):
        labels = annotations[i][annotations[i] != -1]
        if len(labels) > 0:
            values, counts = np.unique(labels, return_counts=True)
            predictions.append(values[np.argmax(counts)])
        else:
            predictions.append(0)  # Default
    return np.array(predictions)

mv_predictions = majority_vote(annotations)
mv_accuracy = np.mean(mv_predictions == true_labels)
print(f"Majority Vote Accuracy: {mv_accuracy:.3f}")

# Annotator quality metrics
quality_metrics = ds_model.get_annotator_quality()
print("\nAnnotator Quality (Estimated by Dawid-Skene):")
print(quality_metrics)

print("\nTrue Annotator Quality:")
for j, acc in enumerate(annotator_accuracy):
    print(f"  Annotator {j}: {acc:.3f}")

# Visualize error matrices
ds_model.visualize_error_matrices(annotator_ids=[0, 2, 4], 
                                  class_names=['Class A', 'Class B', 'Class C'])

18.2.3 When to Use Dawid-Skene

Table 18.2: Decision guide for label aggregation methods
Code
import pandas as pd

aggregation_methods = pd.DataFrame({
    'Method': [
        'Majority Vote',
        'Weighted Vote',
        'Dawid-Skene',
        'MACE',
        'GLAD',
        'Expert Adjudication'
    ],
    'Best When': [
        'All annotators roughly equal quality',
        'Known annotator quality scores',
        'Unknown annotator quality, sufficient data',
        'Bayesian approach preferred',
        'Item difficulty varies greatly',
        'High-stakes, low-volume'
    ],
    'Computational Cost': [
        'O(n)',
        'O(n)',
        'O(n × k × iter)',
        'O(n × k × iter)',
        'O(n × k × iter)',
        'O(n × expert_time)'
    ],
    'Min Annotators': [
        3,
        3,
        3,
        3,
        3,
        1
    ],
    'Handles Missing Data': [
        'Yes',
        'Yes',
        'Yes',
        'Yes',
        'Yes',
        'N/A'
    ],
    'Output': [
        'Hard labels',
        'Hard labels',
        'Soft labels + annotator quality',
        'Soft labels + annotator competence',
        'Soft labels + item difficulty',
        'Hard labels'
    ],
    'Implementation': [
        'scipy.stats.mode',
        'Custom',
        'Custom (shown above)',
        'GitHub: dirko/mace',
        'GitHub: welinder',
        'Human review'
    ]
})

aggregation_methods

19 Label Studio: Implementation

19.1 Installation & Setup

19.1.2 Python Installation

# Install via pip
pip install label-studio

# Or install with ML backend support
pip install label-studio[ml]

# Run server
label-studio start

# Specify port
label-studio start --port 8080

# With custom data directory
label-studio start --data-dir ./my-label-studio-data

19.1.3 Configuration Files

Code
# config.json - Label Studio configuration
{
  "port": 8080,
  "host": "0.0.0.0",
  "data_dir": "./data",
  "database": "label_studio.sqlite3",
  "allow_signup": false,
  "username": "admin@example.com",
  "password": "secure_password_here"
}

19.2 Project Setup

19.2.1 Creating Projects Programmatically

Code
import requests
import json

# Label Studio API endpoint
BASE_URL = "http://localhost:8080"
API_KEY = "your_api_key_here"  # Get from Account & Settings

headers = {
    "Authorization": f"Token {API_KEY}",
    "Content-Type": "application/json"
}

# Create NER project
ner_project = {
    "title": "Named Entity Recognition",
    "description": "Annotate entities in text documents",
    "label_config": '''
    <View>
      <Text name="text" value="$text"/>
      <Labels name="label" toName="text">
        <Label value="PERSON" background="red"/>
        <Label value="ORG" background="blue"/>
        <Label value="LOC" background="green"/>
      </Labels>
    </View>
    ''',
    "sampling": "uniform",
    "show_collab_predictions": False
}

response = requests.post(
    f"{BASE_URL}/api/projects",
    headers=headers,
    json=ner_project
)

project_id = response.json()['id']
print(f"Created project ID: {project_id}")

19.3 Data Import

19.3.1 Importing Tasks

Code
# Import tasks from JSON
tasks = [
    {
        "data": {
            "text": "Apple Inc. CEO Tim Cook announced new products."
        }
    },
    {
        "data": {
            "text": "Microsoft is headquartered in Redmond, Washington."
        }
    }
]

response = requests.post(
    f"{BASE_URL}/api/projects/{project_id}/import",
    headers=headers,
    json=tasks
)

print(f"Imported {len(tasks)} tasks")

19.3.2 Import from Files

Code
import pandas as pd

# Create dataset
df = pd.DataFrame({
    'text': [
        'Example text 1',
        'Example text 2',
        'Example text 3'
    ],
    'metadata': [
        {'source': 'web'},
        {'source': 'pdf'},
        {'source': 'api'}
    ]
})

# Convert to Label Studio format
tasks = []
for idx, row in df.iterrows():
    tasks.append({
        'data': {
            'text': row['text'],
            'id': idx
        },
        'meta': row['metadata']
    })

# Save and import
with open('tasks.json', 'w') as f:
    json.dump(tasks, f)

# Import via file upload
files = {'file': open('tasks.json', 'rb')}
response = requests.post(
    f"{BASE_URL}/api/projects/{project_id}/import",
    headers=headers,
    files=files
)

19.4 Workflow Automation

19.4.1 Task Assignment & Routing

Code
class TaskRouter:
    """
    Intelligent task routing based on annotator performance
    """
    
    def __init__(self, api_key: str, base_url: str = "http://localhost:8080"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Token {api_key}",
            "Content-Type": "application/json"
        }
    
    def get_annotator_stats(self, project_id: int) -> pd.DataFrame:
        """Get performance stats for all annotators"""
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/annotators",
            headers=self.headers
        )
        
        annotators = response.json()
        
        stats = []
        for ann in annotators:
            stats.append({
                'annotator_id': ann['id'],
                'email': ann['email'],
                'total_annotations': ann.get('total_annotations', 0),
                'avg_time': ann.get('avg_lead_time', 0),
                'accuracy': ann.get('accuracy', 0)  # If gold tasks exist
            })
        
        return pd.DataFrame(stats)
    
    def assign_tasks_by_skill(self, project_id: int, task_ids: List[int],
                             difficulty_scores: List[float]):
        """
        Assign tasks to annotators based on skill and task difficulty
        
        Args:
            project_id: Label Studio project ID
            task_ids: List of task IDs to assign
            difficulty_scores: Difficulty score for each task (0-1)
        """
        # Get annotator stats
        annotators = self.get_annotator_stats(project_id)
        
        # Sort annotators by accuracy
        annotators = annotators.sort_values('accuracy', ascending=False)
        
        # Assign difficult tasks to skilled annotators
        for task_id, difficulty in zip(task_ids, difficulty_scores):
            if difficulty > 0.7:
                # Hard task - assign to top annotator
                annotator = annotators.iloc[0]
            elif difficulty > 0.4:
                # Medium task - assign to middle annotator
                annotator = annotators.iloc[len(annotators)//2]
            else:
                # Easy task - can go to anyone
                annotator = annotators.sample(1).iloc[0]
            
            # Make assignment via API
            self._assign_task(project_id, task_id, annotator['annotator_id'])
    
    def _assign_task(self, project_id: int, task_id: int, annotator_id: int):
        """Assign specific task to annotator"""
        payload = {
            'task_id': task_id,
            'annotator_id': annotator_id
        }
        
        response = requests.post(
            f"{self.base_url}/api/projects/{project_id}/tasks/{task_id}/assignments",
            headers=self.headers,
            json=payload
        )
        
        return response.json()

# Example usage
router = TaskRouter(api_key=API_KEY)

# Get list of tasks
response = requests.get(
    f"{BASE_URL}/api/projects/{project_id}/tasks",
    headers=headers
)
tasks = response.json()

# Calculate difficulty (example: based on text length)
task_ids = [t['id'] for t in tasks]
difficulty_scores = [min(1.0, len(t['data']['text']) / 1000) for t in tasks]

# Assign tasks
router.assign_tasks_by_skill(project_id, task_ids, difficulty_scores)

19.4.2 Quality Control Automation

Code
class QualityController:
    """
    Automated quality control for Label Studio
    """
    
    def __init__(self, api_key: str, base_url: str = "http://localhost:8080"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Token {api_key}",
            "Content-Type": "application/json"
        }
    
    def inject_gold_tasks(self, project_id: int, gold_ratio: float = 0.1):
        """
        Inject gold standard tasks for quality monitoring
        
        Args:
            project_id: Project ID
            gold_ratio: Proportion of gold tasks (0-1)
        """
        # Get existing tasks
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/tasks",
            headers=self.headers
        )
        tasks = response.json()
        
        # Randomly select tasks to be gold
        n_gold = int(len(tasks) * gold_ratio)
        gold_task_ids = np.random.choice(
            [t['id'] for t in tasks],
            size=n_gold,
            replace=False
        )
        
        # Mark as gold and add ground truth
        for task_id in gold_task_ids:
            # Get task
            task = next(t for t in tasks if t['id'] == task_id)
            
            # Add ground truth annotation (example for NER)
            gold_annotation = {
                'result': [
                    {
                        'value': {
                            'start': 0,
                            'end': 9,
                            'text': 'Apple Inc',
                            'labels': ['ORG']
                        },
                        'from_name': 'label',
                        'to_name': 'text',
                        'type': 'labels'
                    }
                ],
                'ground_truth': True
            }
            
            # Update task
            requests.post(
                f"{self.base_url}/api/tasks/{task_id}/annotations",
                headers=self.headers,
                json=gold_annotation
            )
    
    def check_annotator_quality(self, project_id: int, 
                               min_accuracy: float = 0.8) -> List[int]:
        """
        Check annotator quality against gold tasks
        
        Args:
            project_id: Project ID
            min_accuracy: Minimum acceptable accuracy
            
        Returns:
            List of annotator IDs below threshold
        """
        # Get all annotations
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/annotations",
            headers=self.headers
        )
        annotations = response.json()
        
        # Calculate accuracy per annotator on gold tasks
        annotator_accuracy = {}
        
        for ann in annotations:
            if ann.get('ground_truth'):
                continue  # Skip ground truth annotations
            
            task_id = ann['task']
            annotator_id = ann['completed_by']
            
            # Get ground truth for this task
            gt_response = requests.get(
                f"{self.base_url}/api/tasks/{task_id}/annotations",
                headers=self.headers,
                params={'ground_truth': True}
            )
            ground_truth = gt_response.json()
            
            if not ground_truth:
                continue
            
            # Compare annotation to ground truth
            is_correct = self._compare_annotations(
                ann['result'],
                ground_truth[0]['result']
            )
            
            if annotator_id not in annotator_accuracy:
                annotator_accuracy[annotator_id] = []
            
            annotator_accuracy[annotator_id].append(is_correct)
        
        # Find annotators below threshold
        low_performers = []
        for annotator_id, results in annotator_accuracy.items():
            accuracy = np.mean(results)
            if accuracy < min_accuracy:
                low_performers.append(annotator_id)
        
        return low_performers
    
    def _compare_annotations(self, ann1: List[dict], ann2: List[dict]) -> bool:
        """
        Compare two annotations for equality
        
        Simplified - actual implementation depends on annotation type
        """
        # For NER, compare entity spans and labels
        if len(ann1) != len(ann2):
            return False
        
        # Sort by start position
        ann1_sorted = sorted(ann1, key=lambda x: x['value']['start'])
        ann2_sorted = sorted(ann2, key=lambda x: x['value']['start'])
        
        for a1, a2 in zip(ann1_sorted, ann2_sorted):
            if (a1['value']['start'] != a2['value']['start'] or
                a1['value']['end'] != a2['value']['end'] or
                a1['value']['labels'] != a2['value']['labels']):
                return False
        
        return True
    
    def auto_review_consensus(self, project_id: int, consensus_threshold: int = 2):
        """
        Automatically accept annotations with sufficient consensus
        
        Args:
            project_id: Project ID
            consensus_threshold: Minimum number of agreeing annotators
        """
        # Get tasks with multiple annotations
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/tasks",
            headers=self.headers,
            params={'annotations__gt': 1}
        )
        tasks = response.json()
        
        for task in tasks:
            task_id = task['id']
            
            # Get all annotations for task
            ann_response = requests.get(
                f"{self.base_url}/api/tasks/{task_id}/annotations",
                headers=self.headers
            )
            annotations = ann_response.json()
            
            # Find consensus
            if self._has_consensus(annotations, consensus_threshold):
                # Auto-accept
                self._accept_task(task_id)
    
    def _has_consensus(self, annotations: List[dict], threshold: int) -> bool:
        """Check if annotations have consensus"""
        # Simplified - actual implementation depends on annotation type
        # For classification, check if threshold annotators agree on label
        
        if len(annotations) < threshold:
            return False
        
        # Count label frequencies
        from collections import Counter
        
        labels = [ann['result'][0]['value']['choices'][0] 
                 for ann in annotations 
                 if ann['result']]
        
        most_common = Counter(labels).most_common(1)
        
        if most_common and most_common[0][1] >= threshold:
            return True
        
        return False
    
    def _accept_task(self, task_id: int):
        """Mark task as accepted"""
        requests.patch(
            f"{self.base_url}/api/tasks/{task_id}",
            headers=self.headers,
            json={'is_labeled': True}
        )

# Example usage
qc = QualityController(api_key=API_KEY)

# Inject 10% gold tasks
qc.inject_gold_tasks(project_id, gold_ratio=0.1)

# Check annotator quality
low_performers = qc.check_annotator_quality(project_id, min_accuracy=0.8)
print(f"Annotators below threshold: {low_performers}")

# Auto-review with consensus
qc.auto_review_consensus(project_id, consensus_threshold=3)

19.5 ML Backend Integration

19.5.1 Pre-annotation with Models

Code
from label_studio_ml.api import LabelStudioMLBase, init_app
from label_studio_ml.utils import get_single_tag_keys
import torch
from transformers import pipeline

class NERPredictor(LabelStudioMLBase):
    """
    Custom ML backend for NER pre-annotation
    """
    
    def __init__(self, **kwargs):
        super(NERPredictor, self).__init__(**kwargs)
        
        # Load pre-trained model
        self.model = pipeline(
            "ner",
            model="dslim/bert-base-NER",
            aggregation_strategy="simple"
        )
        
        # Map model labels to Label Studio labels
        self.label_map = {
            'PER': 'PERSON',
            'ORG': 'ORG',
            'LOC': 'LOC',
            'MISC': 'MISC'
        }
    
    def predict(self, tasks, **kwargs):
        """
        Generate predictions for tasks
        
        Args:
            tasks: List of Label Studio tasks
            
        Returns:
            List of predictions in Label Studio format
        """
        predictions = []
        
        for task in tasks:
            text = task['data']['text']
            
            # Run NER model
            entities = self.model(text)
            
            # Convert to Label Studio format
            results = []
            for entity in entities:
                # Map label
                label = self.label_map.get(
                    entity['entity_group'],
                    entity['entity_group']
                )
                
                results.append({
                    'from_name': 'label',
                    'to_name': 'text',
                    'type': 'labels',
                    'value': {
                        'start': entity['start'],
                        'end': entity['end'],
                        'text': entity['word'],
                        'labels': [label]
                    },
                    'score': entity['score']
                })
            
            predictions.append({
                'result': results,
                'score': np.mean([r['score'] for r in results]) if results else 0,
                'model_version': 'bert-base-NER'
            })
        
        return predictions
    
    def fit(self, completions, workdir=None, **kwargs):
        """
        Fine-tune model on completed annotations
        
        Args:
            completions: Completed annotations from Label Studio
            workdir: Working directory for model storage
        """
        # Extract training data
        texts = []
        labels = []
        
        for completion in completions:
            text = completion['data']['text']
            annotations = completion['annotations'][0]['result']
            
            # Convert to token-level labels
            tokens = text.split()
            token_labels = ['O'] * len(tokens)
            
            for ann in annotations:
                start = ann['value']['start']
                end = ann['value']['end']
                label = ann['value']['labels'][0]
                
                # Find tokens in span
                # (Simplified - actual implementation needs word tokenization)
                token_labels = self._label_tokens(
                    text, tokens, start, end, label
                )
            
            texts.append(tokens)
            labels.append(token_labels)
        
        # Fine-tune model (pseudo-code - actual implementation varies)
        # self.model.fine_tune(texts, labels, epochs=3)
        
        return {'model_version': 'fine-tuned-v1'}

# Run ML backend server
if __name__ == '__main__':
    app = init_app(NERPredictor)
    app.run(host='0.0.0.0', port=9090)

19.5.2 Active Learning Loop

Code
class ActiveLearningManager:
    """
    Manage active learning loop for efficient annotation
    """
    
    def __init__(self, api_key: str, ml_backend_url: str):
        self.api_key = api_key
        self.ml_backend_url = ml_backend_url
        self.base_url = "http://localhost:8080"
        self.headers = {
            "Authorization": f"Token {api_key}",
            "Content-Type": "application/json"
        }
    
    def select_uncertain_samples(self, project_id: int, 
                                n_samples: int = 100,
                                strategy: str = 'entropy') -> List[int]:
        """
        Select most uncertain samples for annotation
        
        Args:
            project_id: Label Studio project ID
            n_samples: Number of samples to select
            strategy: 'entropy', 'margin', or 'random'
            
        Returns:
            List of task IDs to annotate
        """
        # Get all unlabeled tasks
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/tasks",
            headers=self.headers,
            params={'is_labeled': False}
        )
        tasks = response.json()
        
        if not tasks:
            return []
        
        # Get predictions from ML backend
        task_ids = [t['id'] for t in tasks]
        predictions_response = requests.post(
            f"{self.ml_backend_url}/predict",
            json={'tasks': tasks}
        )
        predictions = predictions_response.json()
        
        # Calculate uncertainty scores
        uncertainties = []
        
        for task, pred in zip(tasks, predictions):
            if strategy == 'entropy':
                # Calculate entropy from prediction scores
                scores = [r['score'] for r in pred['result']]
                if scores:
                    # Normalize scores to probabilities
                    probs = np.array(scores) / np.sum(scores)
                    entropy = -np.sum(probs * np.log(probs + 1e-10))
                    uncertainties.append(entropy)
                else:
                    uncertainties.append(0)
            
            elif strategy == 'margin':
                # Margin sampling - difference between top 2 predictions
                scores = sorted([r['score'] for r in pred['result']], reverse=True)
                if len(scores) >= 2:
                    margin = scores[0] - scores[1]
                    uncertainties.append(1 - margin)  # Lower margin = higher uncertainty
                else:
                    uncertainties.append(0)
            
            elif strategy == 'random':
                uncertainties.append(np.random.random())
        
        # Select top-k uncertain samples
        uncertain_indices = np.argsort(uncertainties)[-n_samples:]
        selected_task_ids = [task_ids[i] for i in uncertain_indices]
        
        return selected_task_ids
    
    def run_active_learning_cycle(self, project_id: int, 
                                  n_iterations: int = 5,
                                  samples_per_iter: int = 100):
        """
        Run complete active learning cycle
        
        1. Train model on labeled data
        2. Select uncertain samples
        3. Send for annotation
        4. Wait for completion
        5. Repeat
        """
        for iteration in range(n_iterations):
            print(f"\n=== Active Learning Iteration {iteration + 1} ===")
            
            # Step 1: Train model on current labeled data
            self._trigger_model_training(project_id)
            
            # Step 2: Select uncertain samples
            selected_tasks = self.select_uncertain_samples(
                project_id,
                n_samples=samples_per_iter,
                strategy='entropy'
            )
            
            print(f"Selected {len(selected_tasks)} uncertain tasks")
            
            # Step 3: Prioritize these tasks for annotation
            self._prioritize_tasks(project_id, selected_tasks)
            
            # Step 4: Wait for annotations (in practice, this would be async)
            print("Waiting for annotations...")
            self._wait_for_annotations(project_id, selected_tasks)
            
            # Step 5: Evaluate progress
            labeled_count = self._get_labeled_count(project_id)
            print(f"Total labeled tasks: {labeled_count}")
    
    def _trigger_model_training(self, project_id: int):
        """Trigger model retraining on ML backend"""
        # Get completed annotations
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/annotations",
            headers=self.headers
        )
        completions = response.json()
        
        # Send to ML backend for training
        train_response = requests.post(
            f"{self.ml_backend_url}/train",
            json={'completions': completions}
        )
        
        return train_response.json()
    
    def _prioritize_tasks(self, project_id: int, task_ids: List[int]):
        """Move tasks to top of queue"""
        for task_id in task_ids:
            requests.patch(
                f"{self.base_url}/api/tasks/{task_id}",
                headers=self.headers,
                json={'priority': 10}  # High priority
            )
    
    def _wait_for_annotations(self, project_id: int, task_ids: List[int]):
        """Wait until tasks are annotated (simplified)"""
        import time
        
        while True:
            # Check if all tasks are labeled
            all_labeled = True
            
            for task_id in task_ids:
                response = requests.get(
                    f"{self.base_url}/api/tasks/{task_id}",
                    headers=self.headers
                )
                task = response.json()
                
                if not task.get('is_labeled'):
                    all_labeled = False
                    break
            
            if all_labeled:
                break
            
            time.sleep(60)  # Check every minute
    
    def _get_labeled_count(self, project_id: int) -> int:
        """Count labeled tasks"""
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/tasks",
            headers=self.headers,
            params={'is_labeled': True}
        )
        
        return len(response.json())

# Run active learning
al_manager = ActiveLearningManager(
    api_key=API_KEY,
    ml_backend_url="http://localhost:9090"
)

al_manager.run_active_learning_cycle(
    project_id=project_id,
    n_iterations=5,
    samples_per_iter=100
)

19.6 Export & Analysis

19.6.1 Export Annotations

Code
class AnnotationExporter:
    """
    Export annotations in various formats
    """
    
    def __init__(self, api_key: str, base_url: str = "http://localhost:8080"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Token {api_key}",
            "Content-Type": "application/json"
        }
    
    def export_to_coco(self, project_id: int, output_path: str):
        """
        Export bounding box annotations to COCO format
        
        Args:
            project_id: Label Studio project ID
            output_path: Path to save COCO JSON file
        """
        # Get all annotations
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/export",
            headers=self.headers,
            params={'exportType': 'JSON'}
        )
        annotations = response.json()
        
        # Convert to COCO format
        coco = {
            'images': [],
            'annotations': [],
            'categories': []
        }
        
        # Build category list
        categories = set()
        for ann in annotations:
            for result in ann['annotations'][0]['result']:
                if result['type'] == 'rectanglelabels':
                    for label in result['value']['rectanglelabels']:
                        categories.add(label)
        
        coco['categories'] = [
            {'id': i, 'name': cat}
            for i, cat in enumerate(sorted(categories))
        ]
        
        category_map = {cat['name']: cat['id'] for cat in coco['categories']}
        
        # Build images and annotations
        annotation_id = 0
        
        for image_id, ann in enumerate(annotations):
            # Image info
            coco['images'].append({
                'id': image_id,
                'file_name': ann['data'].get('image', ''),
                'width': ann['data'].get('width', 0),
                'height': ann['data'].get('height', 0)
            })
            
            # Annotations for this image
            for result in ann['annotations'][0]['result']:
                if result['type'] == 'rectanglelabels':
                    bbox_value = result['value']
                    
                    # Convert percentage to pixels
                    x = bbox_value['x'] * ann['data']['width'] / 100
                    y = bbox_value['y'] * ann['data']['height'] / 100
                    w = bbox_value['width'] * ann['data']['width'] / 100
                    h = bbox_value['height'] * ann['data']['height'] / 100
                    
                    for label in bbox_value['rectanglelabels']:
                        coco['annotations'].append({
                            'id': annotation_id,
                            'image_id': image_id,
                            'category_id': category_map[label],
                            'bbox': [x, y, w, h],
                            'area': w * h,
                            'iscrowd': 0
                        })
                        annotation_id += 1
        
        # Save to file
        with open(output_path, 'w') as f:
            json.dump(coco, f, indent=2)
        
        print(f"Exported {len(coco['images'])} images with {len(coco['annotations'])} annotations")
    
    def export_to_conll(self, project_id: int, output_path: str):
        """
        Export NER annotations to CoNLL format
        
        Format:
        token1 O
        token2 B-PER
        token3 I-PER
        token4 O
        
        """
        # Get annotations
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/export",
            headers=self.headers,
            params={'exportType': 'JSON'}
        )
        annotations = response.json()
        
        with open(output_path, 'w') as f:
            for ann in annotations:
                text = ann['data']['text']
                tokens = text.split()  # Simplified tokenization
                
                # Initialize all tokens as O (outside)
                labels = ['O'] * len(tokens)
                
                # Get entity spans
                for result in ann['annotations'][0]['result']:
                    if result['type'] == 'labels':
                        start = result['value']['start']
                        end = result['value']['end']
                        entity_type = result['value']['labels'][0]
                        
                        # Find tokens in span (simplified)
                        char_pos = 0
                        for i, token in enumerate(tokens):
                            token_start = char_pos
                            token_end = char_pos + len(token)
                            
                            # Check overlap with entity span
                            if token_start >= start and token_end <= end:
                                if token_start == start:
                                    labels[i] = f'B-{entity_type}'
                                else:
                                    labels[i] = f'I-{entity_type}'
                            
                            char_pos = token_end + 1  # +1 for space
                
                # Write tokens and labels
                for token, label in zip(tokens, labels):
                    f.write(f"{token} {label}\n")
                f.write("\n")  # Blank line between documents
        
        print(f"Exported {len(annotations)} documents to CoNLL format")
    
    def export_to_yolo(self, project_id: int, output_dir: str):
        """
        Export to YOLO format
        
        Creates:
        - images/ directory with images
        - labels/ directory with .txt files
        - data.yaml with class names
        """
        import os
        os.makedirs(f"{output_dir}/images", exist_ok=True)
        os.makedirs(f"{output_dir}/labels", exist_ok=True)
        
        # Get annotations
        response = requests.get(
            f"{self.base_url}/api/projects/{project_id}/export",
            headers=self.headers,
            params={'exportType': 'JSON'}
        )
        annotations = response.json()
        
        # Get class names
        classes = set()
        for ann in annotations:
            for result in ann['annotations'][0]['result']:
                if result['type'] == 'rectanglelabels':
                    for label in result['value']['rectanglelabels']:
                        classes.add(label)
        
        classes = sorted(classes)
        class_to_id = {cls: i for i, cls in enumerate(classes)}
        
        # Process each image
        for ann in annotations:
            image_name = os.path.basename(ann['data']['image'])
            label_name = os.path.splitext(image_name)[0] + '.txt'
            
            img_width = ann['data']['width']
            img_height = ann['data']['height']
            
            # Write label file
            with open(f"{output_dir}/labels/{label_name}", 'w') as f:
                for result in ann['annotations'][0]['result']:
                    if result['type'] == 'rectanglelabels':
                        bbox_value = result['value']
                        
                        # Convert to YOLO format (normalized center coordinates)
                        x_center = (bbox_value['x'] + bbox_value['width'] / 2) / 100
                        y_center = (bbox_value['y'] + bbox_value['height'] / 2) / 100
                        width = bbox_value['width'] / 100
                        height = bbox_value['height'] / 100
                        
                        for label in bbox_value['rectanglelabels']:
                            class_id = class_to_id[label]
                            f.write(f"{class_id} {x_center} {y_center} {width} {height}\n")
        
        # Write data.yaml
        with open(f"{output_dir}/data.yaml", 'w') as f:
            f.write(f"path: {output_dir}\n")
            f.write(f"train: images\n")
            f.write(f"val: images\n")
            f.write(f"nc: {len(classes)}\n")
            f.write(f"names: {classes}\n")
        
        print(f"Exported to YOLO format in {output_dir}")

# Example usage
exporter = AnnotationExporter(api_key=API_KEY)

# Export to different formats
exporter.export_to_coco(project_id, 'coco_annotations.json')
exporter.export_to_conll(project_id, 'ner_annotations.conll')
exporter.export_to_yolo(project_id, 'yolo_dataset')

20 Workforce Management

20.1 Compensation & Pricing

20.1.1 Fair Wage Calculator

class FairWageCalculator:
    """
    Calculate fair compensation for annotation tasks
    """
    
    def __init__(self, min_hourly_wage: float = 15.0):
        """
        Args:
            min_hourly_wage: Minimum hourly wage in USD
        """
        self.min_hourly_wage = min_hourly_wage
        self.wage_per_second = min_hourly_wage / 3600
    
    def calculate_piece_rate(self, task_type: str, 
                            pilot_times: List[float],
                            complexity_multiplier: float = 1.0) -> dict:
        """
        Calculate fair piece rate based on pilot timing
        
        Args:
            task_type: Type of task (for logging)
            pilot_times: List of completion times in seconds from pilot
            complexity_multiplier: Adjustment for task difficulty
            
        Returns:
            Dict with pricing information
        """
        median_time = np.median(pilot_times)
        p75_time = np.percentile(pilot_times, 75)
        
        # Use 75th percentile to account for learning curve
        # Add 20% buffer for quality work
        estimated_time = p75_time * 1.2 * complexity_multiplier
        
        # Calculate base rate
        base_rate = estimated_time * self.wage_per_second
        
        # Round up to nearest cent
        piece_rate = np.ceil(base_rate * 100) / 100
        
        # Calculate expected hourly rate
        tasks_per_hour = 3600 / estimated_time
        effective_hourly = piece_rate * tasks_per_hour
        
        return {
            'task_type': task_type,
            'piece_rate_usd': piece_rate,
            'median_time_sec': median_time,
            'p75_time_sec': p75_time,
            'estimated_time_sec': estimated_time,
            'tasks_per_hour': tasks_per_hour,
            'effective_hourly_usd': effective_hourly,
            'meets_minimum_wage': effective_hourly >= self.min_hourly_wage
        }
    
    def calculate_tiered_pricing(self, base_rate: float,
                                quality_tiers: dict = None) -> pd.DataFrame:
        """
        Create tiered pricing based on quality
        
        Args:
            base_rate: Base piece rate
            quality_tiers: Dict of {tier_name: multiplier}
            
        Returns:
            DataFrame with tier pricing
        """
        if quality_tiers is None:
            quality_tiers = {
                'Entry Level (< 80% accuracy)': 0.8,
                'Standard (80-90% accuracy)': 1.0,
                'Experienced (90-95% accuracy)': 1.2,
                'Expert (> 95% accuracy)': 1.5
            }
        
        tiers = []
        for tier_name, multiplier in quality_tiers.items():
            tiers.append({
                'tier': tier_name,
                'multiplier': multiplier,
                'piece_rate': base_rate * multiplier,
                'hourly_equivalent': base_rate * multiplier * 100  # Assuming 100 tasks/hr
            })
        
        return pd.DataFrame(tiers)

# Example usage with pilot data
calculator = FairWageCalculator(min_hourly_wage=15.0)

# Simulate pilot timing data for different tasks
np.random.seed(42)

task_types = {
    'Text Classification': np.random.normal(8, 2, 50),  # Mean 8 sec, std 2
    'NER (per document)': np.random.normal(45, 10, 50),
    'Bounding Box': np.random.normal(25, 8, 50),
    'Image Segmentation': np.random.normal(180, 30, 50)
}

pricing_table = []

for task_type, pilot_times in task_types.items():
    pricing = calculator.calculate_piece_rate(
        task_type,
        pilot_times,
        complexity_multiplier=1.0
    )
    pricing_table.append(pricing)

pricing_df = pd.DataFrame(pricing_table)
print("Fair Wage Pricing:")
print(pricing_df)

# Create tiered pricing for NER task
ner_rate = pricing_df[pricing_df['task_type'] == 'NER (per document)']['piece_rate_usd'].values[0]
tiered_pricing = calculator.calculate_tiered_pricing(ner_rate)
print("\nTiered Pricing for NER:")
print(tiered_pricing)

20.1.2 Client Pricing Calculator

class ClientPricingCalculator:
    """
    Calculate client pricing with markup
    """
    
    def __init__(self, cost_structure: dict = None):
        """
        Args:
            cost_structure: Dict with cost components as percentages
        """
        if cost_structure is None:
            self.cost_structure = {
                'labor': 0.40,
                'qa_review': 0.15,
                'platform': 0.10,
                'management': 0.12,
                'training': 0.05,
                'support': 0.03,
                'overhead': 0.05,
                'profit': 0.10
            }
        else:
            self.cost_structure = cost_structure
    
    def calculate_client_price(self, labor_cost: float,
                              volume_tier: str = 'standard') -> dict:
        """
        Calculate client price from labor cost
        
        Args:
            labor_cost: Direct labor cost per task
            volume_tier: 'small', 'standard', 'large', 'enterprise'
            
        Returns:
            Dict with pricing breakdown
        """
        # Volume discounts
        volume_discounts = {
            'small': 1.0,      # < 10K tasks
            'standard': 0.85,  # 10-100K
            'large': 0.75,     # 100K-1M
            'enterprise': 0.65 # > 1M
        }
        
        discount = volume_discounts.get(volume_tier, 1.0)
        
        # Calculate each cost component
        costs = {}
        costs['labor'] = labor_cost
        costs['qa_review'] = labor_cost * (self.cost_structure['qa_review'] / 
                                          self.cost_structure['labor'])
        costs['platform'] = labor_cost * (self.cost_structure['platform'] / 
                                         self.cost_structure['labor'])
        costs['management'] = labor_cost * (self.cost_structure['management'] / 
                                           self.cost_structure['labor'])
        costs['training'] = labor_cost * (self.cost_structure['training'] / 
                                         self.cost_structure['labor'])
        costs['support'] = labor_cost * (self.cost_structure['support'] / 
                                        self.cost_structure['labor'])
        costs['overhead'] = labor_cost * (self.cost_structure['overhead'] / 
                                         self.cost_structure['labor'])
        
        # Total cost before profit
        total_cost = sum(costs.values())
        
        # Add profit margin
        profit_margin = total_cost * (self.cost_structure['profit'] / 
                                     (1 - self.cost_structure['profit']))
        costs['profit'] = profit_margin
        
        # Apply volume discount
        base_price = total_cost + profit_margin
        final_price = base_price * discount
        
        return {
            'labor_cost': labor_cost,
            'cost_breakdown': costs,
            'base_price': base_price,
            'volume_tier': volume_tier,
            'volume_discount': discount,
            'final_price': final_price,
            'markup_percentage': (final_price / labor_cost - 1) * 100
        }
    
    def create_pricing_table(self, labor_costs: dict) -> pd.DataFrame:
        """
        Create comprehensive pricing table
        
        Args:
            labor_costs: Dict of {task_type: labor_cost_per_task}
            
        Returns:
            DataFrame with pricing for all volume tiers
        """
        rows = []
        
        for task_type, labor_cost in labor_costs.items():
            for tier in ['small', 'standard', 'large', 'enterprise']:
                pricing = self.calculate_client_price(labor_cost, tier)
                
                rows.append({
                    'task_type': task_type,
                    'volume_tier': tier,
                    'labor_cost': labor_cost,
                    'total_cost': sum(pricing['cost_breakdown'].values()) - pricing['cost_breakdown']['profit'],
                    'profit': pricing['cost_breakdown']['profit'],
                    'base_price': pricing['base_price'],
                    'discount': f"{(1 - pricing['volume_discount']) * 100:.0f}%",
                    'final_price': pricing['final_price'],
                    'markup': f"{pricing['markup_percentage']:.1f}%"
                })
        
        return pd.DataFrame(rows)

# Example usage
client_pricer = ClientPricingCalculator()

# Use labor costs from previous example
labor_costs = {
    'Text Classification': 0.03,
    'NER (per document)': 0.18,
    'Bounding Box': 0.10,
    'Image Segmentation': 0.72
}

pricing_table = client_pricer.create_pricing_table(labor_costs)
print("Client Pricing Table:")
print(pricing_table)

# Example: Detailed breakdown for NER task at standard volume
ner_pricing = client_pricer.calculate_client_price(
    labor_cost=0.18,
    volume_tier='standard'
)

print("\nDetailed Cost Breakdown (NER, Standard Volume):")
for component, cost in ner_pricing['cost_breakdown'].items():
    percentage = (cost / ner_pricing['final_price']) * 100
    print(f"  {component.capitalize()}: ${cost:.3f} ({percentage:.1f}%)")
print(f"\nFinal Client Price: ${ner_pricing['final_price']:.3f}")
print(f"Markup over Labor: {ner_pricing['markup_percentage']:.1f}%")

20.2 Worker Performance Tracking

20.2.1 Performance Dashboard

class WorkerPerformanceTracker:
    """
    Track and analyze worker performance metrics
    """
    
    def __init__(self):
        self.metrics = pd.DataFrame()
    
    def log_annotation(self, worker_id: str, task_id: str,
                      task_type: str, time_seconds: float,
                      quality_score: float = None, is_gold: bool = False,
                      gold_correct: bool = None):
        """
        Log single annotation event
        
        Args:
            worker_id: Worker identifier
            task_id: Task identifier
            task_type: Type of annotation task
            time_seconds: Time taken to complete
            quality_score: Quality score if available (0-1)
            is_gold: Whether this was a gold standard task
            gold_correct: Whether worker was correct on gold task
        """
        new_row = {
            'worker_id': worker_id,
            'task_id': task_id,
            'task_type': task_type,
            'time_seconds': time_seconds,
            'quality_score': quality_score,
            'is_gold': is_gold,
            'gold_correct': gold_correct,
            'timestamp': pd.Timestamp.now()
        }
        
        self.metrics = pd.concat([
            self.metrics,
            pd.DataFrame([new_row])
        ], ignore_index=True)
    
    def calculate_worker_stats(self, lookback_days: int = 30) -> pd.DataFrame:
        """
        Calculate statistics per worker
        
        Args:
            lookback_days: Number of days to include in stats
            
        Returns:
            DataFrame with worker statistics
        """
        cutoff_date = pd.Timestamp.now() - pd.Timedelta(days=lookback_days)
        recent_data = self.metrics[self.metrics['timestamp'] >= cutoff_date]
        
        stats = []
        
        for worker_id in recent_data['worker_id'].unique():
            worker_data = recent_data[recent_data['worker_id'] == worker_id]
            
            # Calculate metrics
            total_tasks = len(worker_data)
            total_time = worker_data['time_seconds'].sum()
            avg_time = worker_data['time_seconds'].mean()
            median_time = worker_data['time_seconds'].median()
            
            # Gold task performance
            gold_data = worker_data[worker_data['is_gold'] == True]
            if len(gold_data) > 0:
                accuracy = gold_data['gold_correct'].mean()
                n_gold = len(gold_data)
            else:
                accuracy = None
                n_gold = 0
            
            # Tasks per day
            days_active = (worker_data['timestamp'].max() - 
                          worker_data['timestamp'].min()).days + 1
            tasks_per_day = total_tasks / days_active if days_active > 0 else 0
            
            # Calculate consistency (coefficient of variation in time)
            cv_time = (worker_data['time_seconds'].std() / avg_time 
                      if avg_time > 0 else 0)
            
            stats.append({
                'worker_id': worker_id,
                'total_tasks': total_tasks,
                'total_hours': total_time / 3600,
                'avg_time_sec': avg_time,
                'median_time_sec': median_time,
                'cv_time': cv_time,
                'tasks_per_day': tasks_per_day,
                'gold_tasks': n_gold,
                'accuracy': accuracy,
                'days_active': days_active
            })
        
        return pd.DataFrame(stats)
    
    def identify_issues(self, worker_stats: pd.DataFrame) -> dict:
        """
        Identify potential quality or fraud issues
        
        Args:
            worker_stats: DataFrame from calculate_worker_stats()
            
        Returns:
            Dict with flagged workers and reasons
        """
        issues = {
            'too_fast': [],
            'too_slow': [],
            'low_accuracy': [],
            'suspicious_consistency': []
        }
        
        # Calculate thresholds from population
        median_time = worker_stats['median_time_sec'].median()
        q1_time = worker_stats['median_time_sec'].quantile(0.25)
        q3_time = worker_stats['median_time_sec'].quantile(0.75)
        
        for _, worker in worker_stats.iterrows():
            worker_id = worker['worker_id']
            
            # Too fast (< 50% of population median)
            if worker['median_time_sec'] < median_time * 0.5:
                issues['too_fast'].append({
                    'worker_id': worker_id,
                    'median_time': worker['median_time_sec'],
                    'population_median': median_time
                })
            
            # Too slow (> 2x population median)
            if worker['median_time_sec'] > median_time * 2:
                issues['too_slow'].append({
                    'worker_id': worker_id,
                    'median_time': worker['median_time_sec'],
                    'population_median': median_time
                })
            
            # Low accuracy on gold tasks
            if worker['accuracy'] is not None and worker['accuracy'] < 0.7:
                issues['low_accuracy'].append({
                    'worker_id': worker_id,
                    'accuracy': worker['accuracy'],
                    'n_gold': worker['gold_tasks']
                })
            
            # Suspiciously consistent timing (CV < 0.1)
            # Might indicate automation
            if worker['cv_time'] < 0.1 and worker['total_tasks'] > 20:
                issues['suspicious_consistency'].append({
                    'worker_id': worker_id,
                    'cv_time': worker['cv_time'],
                    'total_tasks': worker['total_tasks']
                })
        
        return issues
    
    def plot_worker_performance(self, worker_id: str):
        """
        Create performance visualization for single worker
        """
        import matplotlib.pyplot as plt
        import seaborn as sns
        
        worker_data = self.metrics[self.metrics['worker_id'] == worker_id]
        
        if len(worker_data) == 0:
            print(f"No data for worker {worker_id}")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        
        # 1. Time series of completion time
        axes[0, 0].plot(worker_data['timestamp'], worker_data['time_seconds'], 
                       marker='o', alpha=0.6)
        axes[0, 0].set_title('Task Completion Time Over Time')
        axes[0, 0].set_xlabel('Date')
        axes[0, 0].set_ylabel('Time (seconds)')
        axes[0, 0].tick_params(axis='x', rotation=45)
        
        # 2. Distribution of completion times
        axes[0, 1].hist(worker_data['time_seconds'], bins=30, edgecolor='black', alpha=0.7)
        axes[0, 1].axvline(worker_data['time_seconds'].median(), 
                          color='red', linestyle='--', label='Median')
        axes[0, 1].set_title('Distribution of Completion Times')
        axes[0, 1].set_xlabel('Time (seconds)')
        axes[0, 1].set_ylabel('Frequency')
        axes[0, 1].legend()
        
        # 3. Accuracy on gold tasks over time
        gold_data = worker_data[worker_data['is_gold'] == True].copy()
        if len(gold_data) > 0:
            gold_data['cumulative_accuracy'] = gold_data['gold_correct'].expanding().mean()
            axes[1, 0].plot(gold_data['timestamp'], gold_data['cumulative_accuracy'],
                           marker='o', color='green')
            axes[1, 0].axhline(0.8, color='red', linestyle='--', label='80% Threshold')
            axes[1, 0].set_title('Cumulative Accuracy on Gold Tasks')
            axes[1, 0].set_xlabel('Date')
            axes[1, 0].set_ylabel('Cumulative Accuracy')
            axes[1, 0].set_ylim([0, 1])
            axes[1, 0].legend()
            axes[1, 0].tick_params(axis='x', rotation=45)
        else:
            axes[1, 0].text(0.5, 0.5, 'No Gold Task Data', 
                           ha='center', va='center', fontsize=14)
            axes[1, 0].set_title('Gold Task Performance')
        
        # 4. Tasks per day
        worker_data['date'] = worker_data['timestamp'].dt.date
        tasks_per_day = worker_data.groupby('date').size()
        axes[1, 1].bar(range(len(tasks_per_day)), tasks_per_day.values, alpha=0.7)
        axes[1, 1].set_title('Tasks Completed Per Day')
        axes[1, 1].set_xlabel('Day')
        axes[1, 1].set_ylabel('Number of Tasks')
        
        plt.suptitle(f'Performance Dashboard: Worker {worker_id}', fontsize=16)
        plt.tight_layout()
        plt.savefig(f'worker_{worker_id}_performance.png', dpi=150, bbox_inches='tight')
        plt.close()

# Example usage with synthetic data
tracker = WorkerPerformanceTracker()

# Simulate annotation logs
np.random.seed(42)
n_workers = 10
n_tasks_per_worker = 200

for worker_id in range(n_workers):
    # Worker characteristics
    base_speed = np.random.uniform(20, 60)  # Seconds per task
    accuracy = np.random.uniform(0.6, 0.95)
    
    for task_num in range(n_tasks_per_worker):
        # Generate task completion
        time_taken = np.random.normal(base_speed, base_speed * 0.2)
        time_taken = max(5, time_taken)  # Minimum 5 seconds
        
        # 10% are gold tasks
        is_gold = np.random.random() < 0.1
        gold_correct = np.random.random() < accuracy if is_gold else None
        
        # Log annotation
        timestamp_offset = pd.Timedelta(hours=task_num)
        tracker.metrics = pd.concat([
            tracker.metrics,
            pd.DataFrame([{
                'worker_id': f'W{worker_id:03d}',
                'task_id': f'T{task_num:05d}',
                'task_type': 'NER',

'time_seconds': time_taken,
                'quality_score': None,
                'is_gold': is_gold,
                'gold_correct': gold_correct,
                'timestamp': pd.Timestamp.now() - pd.Timedelta(days=30) + timestamp_offset
            }])
        ], ignore_index=True)

# Calculate worker statistics
worker_stats = tracker.calculate_worker_stats(lookback_days=30)
print("Worker Performance Statistics:")
print(worker_stats.sort_values('accuracy', ascending=False))

# Identify issues
issues = tracker.identify_issues(worker_stats)
print("\nIdentified Issues:")
for issue_type, workers in issues.items():
    if workers:
        print(f"\n{issue_type.upper()}:")
        for worker_issue in workers:
            print(f"  {worker_issue}")

# Plot performance for top and bottom worker
top_worker = worker_stats.nlargest(1, 'accuracy')['worker_id'].values[0]
tracker.plot_worker_performance(top_worker)

20.3 Payment Processing

20.3.1 Payment Calculator & Tracker

Code
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
import numpy as np

class PaymentProcessor:
    """
    Manage worker payments and invoicing
    """
    
    def __init__(self, payment_cycle: str = 'weekly'):
        """
        Args:
            payment_cycle: 'weekly', 'biweekly', or 'monthly'
        """
        self.payment_cycle = payment_cycle
        self.payment_ledger = pd.DataFrame()
        
    def calculate_worker_payment(self, worker_id: str,
                                 annotations: pd.DataFrame,
                                 piece_rates: Dict[str, float],
                                 bonus_structure: Dict = None) -> dict:
        """
        Calculate payment for worker
        
        Args:
            worker_id: Worker ID
            annotations: DataFrame with annotation logs
            piece_rates: Dict of {task_type: rate_per_task}
            bonus_structure: Optional bonus configuration
            
        Returns:
            Payment details dict
        """
        worker_annotations = annotations[annotations['worker_id'] == worker_id]
        
        if len(worker_annotations) == 0:
            return {
                'worker_id': worker_id,
                'total_tasks': 0,
                'base_payment': 0,
                'bonuses': 0,
                'total_payment': 0,
                'effective_hourly': 0,
                'tasks_by_type': {}
            }
        
        # Calculate base payment
        payment_details = {'tasks_by_type': {}}
        total_base = 0
        
        for task_type, group in worker_annotations.groupby('task_type'):
            n_tasks = len(group)
            rate = piece_rates.get(task_type, 0)
            amount = n_tasks * rate
            
            payment_details['tasks_by_type'][task_type] = {
                'count': n_tasks,
                'rate': rate,
                'amount': amount
            }
            total_base += amount
        
        payment_details['base_payment'] = total_base
        
        # Calculate bonuses
        bonuses = 0
        
        if bonus_structure:
            # Quality bonus
            if 'quality_bonus' in bonus_structure:
                gold_tasks = worker_annotations[worker_annotations['is_gold'] == True]
                if len(gold_tasks) > 0:
                    accuracy = gold_tasks['gold_correct'].mean()
                    if accuracy >= bonus_structure['quality_bonus']['threshold']:
                        bonus_amount = total_base * bonus_structure['quality_bonus']['percentage']
                        bonuses += bonus_amount
                        payment_details['quality_bonus'] = {
                            'accuracy': accuracy,
                            'amount': bonus_amount
                        }
            
            # Volume bonus
            if 'volume_bonus' in bonus_structure:
                total_tasks = len(worker_annotations)
                if total_tasks >= bonus_structure['volume_bonus']['threshold']:
                    bonuses += bonus_structure['volume_bonus']['amount']
                    payment_details['volume_bonus'] = bonus_structure['volume_bonus']['amount']
        
        payment_details['worker_id'] = worker_id
        payment_details['total_tasks'] = len(worker_annotations)
        payment_details['bonuses'] = bonuses
        payment_details['total_payment'] = total_base + bonuses
        payment_details['effective_hourly'] = (
            (total_base + bonuses) / (worker_annotations['time_seconds'].sum() / 3600)
            if worker_annotations['time_seconds'].sum() > 0 else 0
        )
        
        return payment_details
    
    def generate_payroll(self, annotations: pd.DataFrame,
                        piece_rates: Dict[str, float],
                        period_start: datetime = None,
                        period_end: datetime = None) -> pd.DataFrame:
        """
        Generate payroll for all workers
        
        Args:
            annotations: All annotation logs
            piece_rates: Piece rates by task type
            period_start: Start of payment period
            period_end: End of payment period
            
        Returns:
            DataFrame with payroll details
        """
        if period_start is None or period_end is None:
            # Use last payment cycle
            period_end = datetime.now()
            if self.payment_cycle == 'weekly':
                period_start = period_end - timedelta(days=7)
            elif self.payment_cycle == 'biweekly':
                period_start = period_end - timedelta(days=14)
            else:  # monthly
                period_start = period_end - timedelta(days=30)
        
        # Filter annotations to period
        period_annotations = annotations[
            (annotations['timestamp'] >= period_start) &
            (annotations['timestamp'] <= period_end)
        ]
        
        # Bonus structure
        bonus_structure = {
            'quality_bonus': {
                'threshold': 0.95,
                'percentage': 0.20
            },
            'volume_bonus': {
                'threshold': 1000,
                'amount': 50.0
            }
        }
        
        # Calculate payments for all workers
        payroll_list = []
        
        for worker_id in period_annotations['worker_id'].unique():
            payment = self.calculate_worker_payment(
                worker_id,
                period_annotations,
                piece_rates,
                bonus_structure
            )
            
            # Flatten the payment dict for DataFrame
            payroll_row = {
                'worker_id': payment['worker_id'],
                'total_tasks': payment['total_tasks'],
                'base_payment': payment['base_payment'],
                'bonuses': payment['bonuses'],
                'total_payment': payment['total_payment'],
                'effective_hourly': payment['effective_hourly'],
                'period_start': period_start,
                'period_end': period_end,
                'payment_cycle': self.payment_cycle,
                'payment_status': 'pending'
            }
            
            # Store full details separately for invoice generation
            payroll_row['_full_details'] = payment
            
            payroll_list.append(payroll_row)
        
        if not payroll_list:
            # Return empty DataFrame with correct columns
            return pd.DataFrame(columns=[
                'worker_id', 'total_tasks', 'base_payment', 'bonuses',
                'total_payment', 'effective_hourly', 'period_start',
                'period_end', 'payment_cycle', 'payment_status'
            ])
        
        payroll_df = pd.DataFrame(payroll_list)
        
        return payroll_df
    
    def generate_invoice(self, worker_id: str, payment_details: dict) -> str:
        """
        Generate invoice for worker
        
        Args:
            worker_id: Worker ID
            payment_details: Payment calculation dict
            
        Returns:
            Invoice as formatted string
        """
        invoice = f"""
╔════════════════════════════════════════════════════════════════╗
║                    ANNOTATION SERVICES INVOICE                 ║
╚════════════════════════════════════════════════════════════════╝

Worker ID: {worker_id}
Payment Period: {payment_details.get('period_start', 'N/A')} to {payment_details.get('period_end', 'N/A')}
Invoice Date: {datetime.now().strftime('%Y-%m-%d')}

────────────────────────────────────────────────────────────────

WORK COMPLETED:
"""
        
        for task_type, details in payment_details.get('tasks_by_type', {}).items():
            invoice += f"""
  {task_type}:
    Tasks Completed: {details['count']}
    Rate per Task: ${details['rate']:.3f}
    Subtotal: ${details['amount']:.2f}
"""
        
        invoice += f"""
────────────────────────────────────────────────────────────────

BASE PAYMENT:                                          ${payment_details['base_payment']:.2f}
"""
        
        if payment_details.get('quality_bonus'):
            invoice += f"""
QUALITY BONUS (Accuracy: {payment_details['quality_bonus']['accuracy']:.1%}):  ${payment_details['quality_bonus']['amount']:.2f}
"""
        
        if payment_details.get('volume_bonus'):
            invoice += f"""
VOLUME BONUS:                                          ${payment_details['volume_bonus']:.2f}
"""
        
        invoice += f"""
────────────────────────────────────────────────────────────────

TOTAL PAYMENT:                                         ${payment_details['total_payment']:.2f}

Effective Hourly Rate: ${payment_details['effective_hourly']:.2f}/hour

────────────────────────────────────────────────────────────────

Payment will be processed within 5 business days.
Questions? Contact: finance@annotationservices.com

Thank you for your valuable contributions!
"""
        
        return invoice

# Example usage
# First, create some sample annotation data
np.random.seed(42)

# Create sample tracker metrics
sample_data = []
for worker_id in ['W001', 'W002', 'W003']:
    for i in range(150):
        sample_data.append({
            'worker_id': worker_id,
            'task_id': f'T{i:05d}',
            'task_type': np.random.choice(['NER', 'Text Classification', 'Bounding Box']),
            'time_seconds': np.random.normal(45, 10),
            'quality_score': None,
            'is_gold': np.random.random() < 0.1,
            'gold_correct': np.random.random() < 0.96,
            'timestamp': datetime.now() - timedelta(days=np.random.randint(0, 7))
        })

annotations_df = pd.DataFrame(sample_data)

# Create processor
processor = PaymentProcessor(payment_cycle='weekly')

# Define piece rates
piece_rates = {
    'NER': 0.18,
    'Text Classification': 0.03,
    'Bounding Box': 0.10
}

# Generate payroll
payroll = processor.generate_payroll(
    annotations_df,
    piece_rates,
    period_start=datetime.now() - timedelta(days=7),
    period_end=datetime.now()
)

print("Payroll Summary:")
print(payroll[['worker_id', 'total_tasks', 'base_payment', 'bonuses', 'total_payment', 'effective_hourly']])

# Generate invoice for top earner
if len(payroll) > 0:
    top_earner = payroll.nlargest(1, 'total_payment').iloc[0]
    
    # Get full payment details from the stored details
    payment_details = top_earner['_full_details']
    payment_details['period_start'] = top_earner['period_start'].strftime('%Y-%m-%d')
    payment_details['period_end'] = top_earner['period_end'].strftime('%Y-%m-%d')
    
    invoice = processor.generate_invoice(top_earner['worker_id'], payment_details)
    print("\n" + invoice)
else:
    print("No payroll data for the period")

21 Complete System Architecture

21.1 System Overview

graph TB
    subgraph "Client Interface"
        ClientAPI[Client API]
        ClientDash[Client Dashboard]
    end
    
    subgraph "Core Platform"
        TaskMgmt[Task Management]
        Assignment[Task Assignment Engine]
        Workflow[Workflow Orchestrator]
        Quality[Quality Control]
    end
    
    subgraph "Annotation Interface"
        WebUI[Web Annotation UI]
        MobileUI[Mobile UI]
        API[Annotation API]
    end
    
    subgraph "ML Backend"
        PreAnnotation[Pre-annotation Models]
        ActiveLearning[Active Learning]
        Aggregation[Label Aggregation]
    end
    
    subgraph "Worker Management"
        WorkerDB[(Worker Database)]
        Performance[Performance Tracking]
        Payment[Payment System]
    end
    
    subgraph "Data Storage"
        RawData[(Raw Data Storage)]
        Annotations[(Annotation Database)]
        Backups[(Backup Storage)]
    end
    
    subgraph "External Services"
        S3[AWS S3]
        Stripe[Stripe Payment]
        Email[Email Service]
    end
    
    ClientAPI --> TaskMgmt
    ClientDash --> TaskMgmt
    TaskMgmt --> Assignment
    Assignment --> Workflow
    Workflow --> WebUI
    Workflow --> MobileUI
    Workflow --> API
    
    WebUI --> Annotations
    MobileUI --> Annotations
    API --> Annotations
    
    Annotations --> Quality
    Quality --> Aggregation
    Aggregation --> PreAnnotation
    PreAnnotation --> ActiveLearning
    
    Assignment --> WorkerDB
    Performance --> WorkerDB
    Payment --> WorkerDB
    Payment --> Stripe
    
    TaskMgmt --> RawData
    Annotations --> RawData
    RawData --> Backups
    Annotations --> Backups
    
    RawData --> S3
    Backups --> S3
    
    Workflow --> Email
Figure 21.1: Complete annotation platform architecture

21.2 Technology Stack

Table 21.1: Recommended technology stack for annotation platform
Code
import pandas as pd

tech_stack = pd.DataFrame({
    'Component': [
        'Web Framework',
        'API Framework',
        'Database (Primary)',
        'Database (Cache)',
        'Task Queue',
        'File Storage',
        'ML Framework',
        'Frontend',
        'Authentication',
        'Monitoring',
        'Analytics',
        'Payment Processing',
        'Email Service',
        'Deployment',
        'Load Balancer'
    ],
    'Technology': [
        'Django / FastAPI',
        'FastAPI',
        'PostgreSQL',
        'Redis',
        'Celery + RabbitMQ',
        'AWS S3 / MinIO',
        'PyTorch / TensorFlow',
        'React + TypeScript',
        'Auth0 / Django Auth',
        'Prometheus + Grafana',
        'Mixpanel / Amplitude',
        'Stripe',
        'SendGrid / Mailgun',
        'Docker + Kubernetes',
        'Nginx'
    ],
    'Purpose': [
        'Backend application framework',
        'RESTful API endpoints',
        'Relational data storage',
        'Session cache, task queue',
        'Async task processing',
        'Media file storage',
        'Model training/inference',
        'Annotation interface',
        'User authentication',
        'System health monitoring',
        'User behavior analytics',
        'Worker payments',
        'Automated emails',
        'Container orchestration',
        'Traffic distribution'
    ],
    'Alternatives': [
        'Flask, Node.js',
        'Django REST Framework',
        'MySQL, MongoDB',
        'Memcached',
        'Apache Airflow, AWS SQS',
        'Google Cloud Storage, Azure',
        'JAX, Scikit-learn',
        'Vue.js, Angular',
        'Keycloak, Firebase Auth',
        'Datadog, New Relic',
        'Google Analytics',
        'PayPal, Square',
        'Amazon SES',
        'AWS ECS, Google GKE',
        'HAProxy, AWS ALB'
    ]
})

tech_stack