class NERAnnotation:
"""
Data structure for NER annotation
"""
def __init__(self, text: str):
self.text = text
self.entities = []
def add_entity(self, start: int, end: int, label: str, text: str):
"""
Add entity span
Args:
start: Character offset start
end: Character offset end
label: Entity type (PERSON, ORG, LOC, etc.)
text: The actual text span
"""
entity = {
'start': start,
'end': end,
'label': label,
'text': text
}
# Validate no overlap with existing entities
if not self._check_overlap(start, end):
self.entities.append(entity)
return True
return False
def _check_overlap(self, start: int, end: int) -> bool:
"""Check if span overlaps with existing entities"""
for entity in self.entities:
if not (end <= entity['start'] or start >= entity['end']):
return True
return False
def to_dict(self):
return {
'text': self.text,
'entities': self.entities
}
# Example usage
example_text = "Apple Inc. CEO Tim Cook announced new products in Cupertino."
annotation = NERAnnotation(example_text)
annotation.add_entity(0, 10, 'ORG', 'Apple Inc.')
annotation.add_entity(16, 24, 'PERSON', 'Tim Cook')
annotation.add_entity(54, 63, 'LOC', 'Cupertino')
print(annotation.to_dict())16 Guide to Data Annotation
16.1 Industry Landscape
16.1.1 Major Players
| Company | Specialization | Business Model | Scale |
|---|---|---|---|
| Scale AI | Multi-modal, autonomous vehicles | Managed service | $7B+ valuation |
| Labelbox | Platform + services | Software + marketplace | Series D |
| Appen | Text, speech, image | Crowd platform | Public (ASX) |
| Amazon SageMaker Ground Truth | AWS-integrated | Platform | Part of AWS |
| Hive | Computer vision | API + human loop | Series C |
| Snorkel AI | Weak supervision, programmatic | Software platform | Series C |
| SuperAnnotate | Computer vision, LiDAR | Platform + services | Series B |
| Dataloop | MLOps + annotation | Integrated platform | Series C |
Table 16.1 shows major players in this market.
16.1.2 Open Source Tools
| Tool | Best For | Language |
|---|---|---|
| Label Studio | Multi-modal, flexible | Python/React |
| CVAT | Computer vision, video | Python/React |
| Prodigy | Text, active learning | Python |
| Doccano | Text annotation | Python/Vue |
| VoTT | Object detection | TypeScript |
| Labelme | Image segmentation | Python |
| BRAT | Text, NER | Python/JavaScript |
Table 16.2 shows current open-source that are available on the market.
Our Choice: Label Studio
Why Label Studio for this guide:
Multi-modal: Handles text, image, video, audio
Highly customizable: XML-based config
Active community: Regular updates
Production-ready: Used by major companies
ML integration: Pre-annotation, active learning
Self-hostable: Full control over data
16.2 Market Economics
16.2.1 Pricing Benchmarks (2024-2025)
Code
import pandas as pd
import plotly.graph_objects as go
import numpy as np
# Pricing data from market research
pricing_data = pd.DataFrame({
'Task Type': [
'Text Classification',
'NER (per doc)',
'Sentiment Analysis',
'Image Classification',
'Bounding Box (per image)',
'Semantic Segmentation',
'Video Object Tracking (per min)',
'Audio Transcription (per min)',
'Medical Image Annotation',
'3D Point Cloud (LIDAR)'
],
'Low ($)': [0.01, 0.05, 0.02, 0.02, 0.08, 0.50, 2.00, 0.80, 3.00, 5.00],
'Median ($)': [0.03, 0.15, 0.05, 0.05, 0.25, 1.50, 5.00, 1.50, 8.00, 15.00],
'High ($)': [0.08, 0.40, 0.12, 0.15, 0.75, 5.00, 15.00, 3.00, 20.00, 40.00],
'Complexity': [1, 3, 2, 1, 3, 5, 7, 4, 8, 9]
})
fig = go.Figure()
fig.add_trace(go.Bar(
name='Low',
x=pricing_data['Task Type'],
y=pricing_data['Low ($)'],
marker_color='lightblue'
))
fig.add_trace(go.Bar(
name='Median',
x=pricing_data['Task Type'],
y=pricing_data['Median ($)'],
marker_color='steelblue'
))
fig.add_trace(go.Bar(
name='High',
x=pricing_data['Task Type'],
y=pricing_data['High ($)'],
marker_color='darkblue'
))
fig.update_layout(
barmode='group',
title='Market Pricing by Task Type (2024-2025)',
xaxis_title='Task Type',
yaxis_title='Price (USD)',
yaxis_type='log',
height=500,
hovermode='x unified'
)
fig.show()16.2.2 Cost Structure
Code
import plotly.graph_objects as go
costs = {
'Labor (Annotators)': 40,
'QA/Review': 15,
'Platform/Infrastructure': 10,
'Management/PM': 12,
'Training/Onboarding': 5,
'Customer Support': 3,
'Overhead': 5,
'Profit Margin': 10
}
fig = go.Figure(data=[go.Pie(
labels=list(costs.keys()),
values=list(costs.values()),
hole=.3,
marker_colors=['#FF6B6B', '#4ECDC4', '#45B7D1', '#FFA07A',
'#98D8C8', '#F7DC6F', '#BB8FCE', '#85C1E2']
)])
fig.update_layout(
title='Typical Cost Structure for Annotation Service (%)',
height=500
)
fig.show()17 Media Types & Annotation Tasks
17.1 Text Annotation
17.1.1 Task Taxonomy
Code
import pandas as pd
text_tasks = pd.DataFrame({
'Task': [
'Text Classification',
'Named Entity Recognition',
'Relation Extraction',
'Sentiment Analysis',
'Intent Detection',
'Slot Filling',
'Coreference Resolution',
'Semantic Role Labeling',
'Question Answering',
'Summarization Quality'
],
'Level': [
'Document',
'Token/Span',
'Span Pair',
'Document/Aspect',
'Utterance',
'Token',
'Multi-span',
'Sentence',
'Span',
'Document'
],
'Complexity': [1, 3, 4, 2, 2, 3, 5, 5, 3, 4],
'Avg Time (sec)': [5, 45, 60, 15, 10, 30, 90, 120, 40, 60],
'Typical Agreement (κ)': [0.85, 0.75, 0.65, 0.70, 0.80, 0.75, 0.60, 0.55, 0.70, 0.65]
})
text_tasks17.1.2 Named Entity Recognition (NER)
Annotation Interface Requirements:
Common Edge Cases:
Code
import pandas as pd
edge_cases = pd.DataFrame({
'Scenario': [
'Nested entities',
'Discontinuous mentions',
'Ambiguous boundaries',
'Coordinated entities',
'Metonymy',
'Generic vs specific'
],
'Example': [
'[Bank of [America]_ORG]_ORG',
'New York and Los Angeles (two separate LOC)',
'U.S. vs U.S vs US',
'Google and Facebook',
'Wall Street (location vs financial industry)',
'apple (fruit vs Apple company)'
],
'Resolution Strategy': [
'Annotate longest span only',
'Mark each entity separately',
'Normalize to consistent form',
'Mark each entity individually',
'Use context to decide',
'Require capitalization for ORG'
],
'Guideline Priority': [
'High',
'Medium',
'High',
'Medium',
'High',
'High'
]
})
edge_casesSynthetic Training Data Generation:
import random
from typing import List, Tuple
class SyntheticNERGenerator:
"""
Generate synthetic NER training data for testing annotation workflows
"""
def __init__(self):
self.templates = [
"{PERSON} works at {ORG} in {LOC}.",
"{ORG} announced that {PERSON} will lead the {ORG} division.",
"The {ORG} headquarters in {LOC} employs {PERSON}.",
"{PERSON} traveled from {LOC} to {LOC} for {ORG} business.",
]
self.entities = {
'PERSON': ['Alice Johnson', 'Bob Smith', 'Carol Martinez',
'David Lee', 'Emma Wilson', 'Frank Chen'],
'ORG': ['TechCorp', 'DataSystems Inc.', 'Global Analytics',
'Innovation Labs', 'Future Solutions'],
'LOC': ['New York', 'San Francisco', 'London',
'Singapore', 'Berlin', 'Tokyo']
}
def generate(self, n: int = 10) -> List[dict]:
"""Generate n synthetic examples"""
examples = []
for _ in range(n):
template = random.choice(self.templates)
entities_used = {}
# Replace placeholders
text = template
for entity_type in ['PERSON', 'ORG', 'LOC']:
count = text.count(f'{{{entity_type}}}')
entities_used[entity_type] = random.sample(
self.entities[entity_type],
count
)
# Build annotated version
annotations = []
for entity_type in ['PERSON', 'ORG', 'LOC']:
for entity_text in entities_used[entity_type]:
placeholder = f'{{{entity_type}}}'
start = text.find(placeholder)
if start != -1:
text = text.replace(placeholder, entity_text, 1)
annotations.append({
'start': start,
'end': start + len(entity_text),
'label': entity_type,
'text': entity_text
})
examples.append({
'text': text,
'entities': sorted(annotations, key=lambda x: x['start'])
})
return examples
# Generate examples
generator = SyntheticNERGenerator()
synthetic_examples = generator.generate(5)
for i, example in enumerate(synthetic_examples, 1):
print(f"\nExample {i}:")
print(f"Text: {example['text']}")
print(f"Entities: {example['entities']}")17.2 Image Annotation
17.2.1 Task Taxonomy
Code
import plotly.graph_objects as go
import numpy as np
tasks = [
'Image Classification',
'Bounding Box',
'Polygon Segmentation',
'Semantic Segmentation',
'Instance Segmentation',
'Keypoint Annotation',
'Panoptic Segmentation',
'3D Cuboid'
]
# Complexity (1-10), Time (seconds), Precision Required (1-10)
complexity = np.array([1, 3, 5, 6, 7, 4, 8, 6])
time_required = np.array([5, 20, 120, 300, 400, 60, 450, 90])
precision = np.array([3, 6, 8, 9, 9, 7, 10, 8])
fig = go.Figure()
fig.add_trace(go.Scatter(
x=complexity,
y=time_required,
mode='markers+text',
marker=dict(
size=precision * 5,
color=precision,
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Precision<br>Required")
),
text=tasks,
textposition='top center',
hovertemplate='<b>%{text}</b><br>' +
'Complexity: %{x}<br>' +
'Time: %{y} sec<br>' +
'<extra></extra>'
))
fig.update_layout(
title='Image Annotation Task Space',
xaxis_title='Task Complexity (1-10)',
yaxis_title='Average Time Required (seconds)',
height=600,
width=900
)
fig.show()17.2.2 Bounding Box Annotation
Data Structure:
from dataclasses import dataclass
from typing import List, Tuple
import json
@dataclass
class BoundingBox:
"""
Bounding box representation
Formats:
- XYXY: (x_min, y_min, x_max, y_max)
- XYWH: (x, y, width, height)
- COCO: [x, y, width, height] (top-left corner)
- YOLO: (x_center, y_center, width, height) normalized
"""
x: float
y: float
width: float
height: float
label: str
confidence: float = 1.0
image_width: int = None
image_height: int = None
def to_xyxy(self) -> Tuple[float, float, float, float]:
"""Convert to (x_min, y_min, x_max, y_max)"""
return (self.x, self.y, self.x + self.width, self.y + self.height)
def to_xywh(self) -> Tuple[float, float, float, float]:
"""Convert to (x, y, width, height)"""
return (self.x, self.y, self.width, self.height)
def to_yolo(self) -> Tuple[float, float, float, float]:
"""Convert to YOLO format (normalized center coordinates)"""
if self.image_width is None or self.image_height is None:
raise ValueError("Image dimensions required for YOLO format")
x_center = (self.x + self.width / 2) / self.image_width
y_center = (self.y + self.height / 2) / self.image_height
width_norm = self.width / self.image_width
height_norm = self.height / self.image_height
return (x_center, y_center, width_norm, height_norm)
def to_coco(self) -> dict:
"""Convert to COCO format"""
return {
'bbox': [self.x, self.y, self.width, self.height],
'category': self.label,
'score': self.confidence
}
def area(self) -> float:
"""Calculate box area"""
return self.width * self.height
def iou(self, other: 'BoundingBox') -> float:
"""
Calculate Intersection over Union with another box
Used for:
- Agreement between annotators
- Matching predicted vs ground truth
- Non-maximum suppression
"""
x1_min, y1_min, x1_max, y1_max = self.to_xyxy()
x2_min, y2_min, x2_max, y2_max = other.to_xyxy()
# Calculate intersection
x_left = max(x1_min, x2_min)
y_top = max(y1_min, y2_min)
x_right = min(x1_max, x2_max)
y_bottom = min(y1_max, y2_max)
if x_right < x_left or y_bottom < y_top:
return 0.0
intersection_area = (x_right - x_left) * (y_bottom - y_top)
# Calculate union
box1_area = self.area()
box2_area = other.area()
union_area = box1_area + box2_area - intersection_area
return intersection_area / union_area if union_area > 0 else 0.0
# Example usage
box1 = BoundingBox(x=100, y=100, width=200, height=150, label='person',
image_width=1920, image_height=1080)
box2 = BoundingBox(x=120, y=110, width=180, height=140, label='person',
image_width=1920, image_height=1080)
print(f"Box 1 (XYXY): {box1.to_xyxy()}")
print(f"Box 1 (YOLO): {box1.to_yolo()}")
print(f"IoU between boxes: {box1.iou(box2):.3f}")Quality Metrics for Bounding Boxes:
import numpy as np
from typing import List
from scipy.optimize import linear_sum_assignment
class BoundingBoxQualityMetrics:
"""
Calculate quality metrics for bounding box annotations
"""
@staticmethod
def calculate_agreement(
annotator1_boxes: List[BoundingBox],
annotator2_boxes: List[BoundingBox],
iou_threshold: float = 0.5
) -> dict:
"""
Calculate agreement between two annotators
Returns:
dict with precision, recall, F1, mean_iou
"""
if len(annotator1_boxes) == 0 and len(annotator2_boxes) == 0:
return {'precision': 1.0, 'recall': 1.0, 'f1': 1.0, 'mean_iou': 1.0}
if len(annotator1_boxes) == 0 or len(annotator2_boxes) == 0:
return {'precision': 0.0, 'recall': 0.0, 'f1': 0.0, 'mean_iou': 0.0}
# Build IoU matrix
iou_matrix = np.zeros((len(annotator1_boxes), len(annotator2_boxes)))
for i, box1 in enumerate(annotator1_boxes):
for j, box2 in enumerate(annotator2_boxes):
if box1.label == box2.label: # Only match same class
iou_matrix[i, j] = box1.iou(box2)
# Hungarian algorithm for optimal matching
row_ind, col_ind = linear_sum_assignment(-iou_matrix)
# Count matches above threshold
matches = 0
matched_ious = []
for i, j in zip(row_ind, col_ind):
if iou_matrix[i, j] >= iou_threshold:
matches += 1
matched_ious.append(iou_matrix[i, j])
# Calculate metrics
precision = matches / len(annotator1_boxes) if len(annotator1_boxes) > 0 else 0
recall = matches / len(annotator2_boxes) if len(annotator2_boxes) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
mean_iou = np.mean(matched_ious) if matched_ious else 0.0
return {
'precision': precision,
'recall': recall,
'f1': f1,
'mean_iou': mean_iou,
'matches': matches,
'total_boxes_ann1': len(annotator1_boxes),
'total_boxes_ann2': len(annotator2_boxes)
}
@staticmethod
def aggregate_boxes(
boxes_list: List[List[BoundingBox]],
iou_threshold: float = 0.5,
min_consensus: int = 2
) -> List[BoundingBox]:
"""
Aggregate bounding boxes from multiple annotators
Strategy: Cluster boxes by IoU, keep clusters with min_consensus
"""
if not boxes_list:
return []
# Flatten all boxes with annotator ID
all_boxes = []
for annotator_id, boxes in enumerate(boxes_list):
for box in boxes:
all_boxes.append((annotator_id, box))
if not all_boxes:
return []
# Cluster boxes
clusters = []
used = set()
for i, (ann_id1, box1) in enumerate(all_boxes):
if i in used:
continue
cluster = [(ann_id1, box1)]
used.add(i)
for j, (ann_id2, box2) in enumerate(all_boxes):
if j in used or ann_id1 == ann_id2:
continue
if box1.label == box2.label and box1.iou(box2) >= iou_threshold:
cluster.append((ann_id2, box2))
used.add(j)
if len(cluster) >= min_consensus:
clusters.append(cluster)
# Average boxes in each cluster
aggregated = []
for cluster in clusters:
avg_x = np.mean([box.x for _, box in cluster])
avg_y = np.mean([box.y for _, box in cluster])
avg_w = np.mean([box.width for _, box in cluster])
avg_h = np.mean([box.height for _, box in cluster])
label = cluster[0][1].label
confidence = len(cluster) / len(boxes_list)
aggregated.append(BoundingBox(
x=avg_x, y=avg_y, width=avg_w, height=avg_h,
label=label, confidence=confidence
))
return aggregated
# Example: Calculate agreement
ann1_boxes = [
BoundingBox(100, 100, 200, 150, 'car'),
BoundingBox(400, 200, 150, 100, 'person')
]
ann2_boxes = [
BoundingBox(105, 105, 195, 145, 'car'),
BoundingBox(420, 210, 140, 95, 'person'),
BoundingBox(700, 300, 80, 120, 'bicycle')
]
metrics = BoundingBoxQualityMetrics.calculate_agreement(ann1_boxes, ann2_boxes)
print("Agreement Metrics:")
for key, value in metrics.items():
print(f" {key}: {value:.3f}" if isinstance(value, float) else f" {key}: {value}")17.2.3 Segmentation Masks
Polygon Representation:
import numpy as np
from shapely.geometry import Polygon
from shapely.validation import make_valid
import cv2
class SegmentationMask:
"""
Represent segmentation masks in multiple formats
"""
def __init__(self, polygon_points: List[Tuple[int, int]] = None,
mask_array: np.ndarray = None, label: str = None):
"""
Initialize from either polygon points or binary mask
Args:
polygon_points: List of (x, y) tuples
mask_array: Binary numpy array (H, W)
label: Class label
"""
self.label = label
if polygon_points is not None:
self.polygon = Polygon(polygon_points)
if not self.polygon.is_valid:
self.polygon = make_valid(self.polygon)
self._mask = None
self._rle = None
elif mask_array is not None:
self._mask = mask_array.astype(np.uint8)
self.polygon = None
self._rle = None
else:
raise ValueError("Must provide either polygon_points or mask_array")
def to_polygon(self) -> List[Tuple[int, int]]:
"""Get polygon coordinates"""
if self.polygon:
return list(self.polygon.exterior.coords)
else:
# Extract contours from mask
contours, _ = cv2.findContours(
self._mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
if contours:
# Return largest contour
largest = max(contours, key=cv2.contourArea)
return [(int(pt[0][0]), int(pt[0][1])) for pt in largest]
return []
def to_mask(self, height: int, width: int) -> np.ndarray:
"""Convert to binary mask"""
if self._mask is not None:
return self._mask
# Rasterize polygon
mask = np.zeros((height, width), dtype=np.uint8)
points = np.array(self.to_polygon(), dtype=np.int32)
cv2.fillPoly(mask, [points], 1)
self._mask = mask
return mask
def to_rle(self, height: int, width: int) -> dict:
"""
Convert to Run-Length Encoding (COCO format)
More compact storage for masks
"""
mask = self.to_mask(height, width)
# Flatten mask in Fortran order (column-major)
pixels = mask.T.flatten()
# Find run lengths
pixels = np.concatenate([[0], pixels, [0]])
runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
runs[1::2] -= runs[::2]
return {
'counts': runs.tolist(),
'size': [height, width]
}
def area(self) -> float:
"""Calculate mask area"""
if self.polygon:
return self.polygon.area
elif self._mask is not None:
return np.sum(self._mask)
return 0
def iou(self, other: 'SegmentationMask', height: int, width: int) -> float:
"""Calculate IoU with another mask"""
mask1 = self.to_mask(height, width)
mask2 = other.to_mask(height, width)
intersection = np.logical_and(mask1, mask2).sum()
union = np.logical_or(mask1, mask2).sum()
return intersection / union if union > 0 else 0.0
def dice_coefficient(self, other: 'SegmentationMask',
height: int, width: int) -> float:
"""
Calculate Dice coefficient (F1 for segmentation)
Dice = 2 * |A ∩ B| / (|A| + |B|)
"""
mask1 = self.to_mask(height, width)
mask2 = other.to_mask(height, width)
intersection = np.logical_and(mask1, mask2).sum()
sum_areas = mask1.sum() + mask2.sum()
return 2 * intersection / sum_areas if sum_areas > 0 else 0.0
# Example usage
polygon_points = [(100, 100), (200, 100), (200, 200), (100, 200), (100, 100)]
seg_mask = SegmentationMask(polygon_points=polygon_points, label='person')
# Convert to different formats
binary_mask = seg_mask.to_mask(height=300, width=300)
rle = seg_mask.to_rle(height=300, width=300)
print(f"Mask shape: {binary_mask.shape}")
print(f"Mask area: {seg_mask.area()}")
print(f"RLE counts (first 10): {rle['counts'][:10]}")17.3 Video Annotation
17.3.1 Challenges Unique to Video
Code
import pandas as pd
video_challenges = pd.DataFrame({
'Challenge': [
'Temporal Consistency',
'Occlusion Handling',
'Object Re-identification',
'Motion Blur',
'Scale Variation',
'Annotation Volume',
'Keyframe Selection',
'Interpolation Accuracy'
],
'Impact on Quality': [
'High - ID switches common',
'High - Lost tracks',
'High - Same object different IDs',
'Medium - Boundary uncertainty',
'Medium - Small object detection',
'High - Prohibitive manual effort',
'Medium - Miss important frames',
'Medium - Drift between keyframes'
],
'Mitigation Strategy': [
'Track review UI with temporal context',
'Flag occlusion states explicitly',
'Visual similarity matching tools',
'Multiple frame context',
'Consistent zoom level',
'Keyframe + interpolation workflow',
'Scene change detection',
'Optical flow-based interpolation'
],
'Cost Impact': [
'3-5x vs single frame',
'1.5x (review time)',
'2x (manual correction)',
'1.2x (slower annotation)',
'1.3x (zoom overhead)',
'10-30x (30 fps video)',
'0.5x (reduces frames)',
'0.3x (auto-fills frames)'
]
})
video_challenges17.3.2 Object Tracking Data Structure
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from enum import Enum
class ObjectState(Enum):
"""Object visibility states"""
VISIBLE = "visible"
OCCLUDED = "occluded"
OUT_OF_FRAME = "out_of_frame"
UNCERTAIN = "uncertain"
@dataclass
class TrackedObject:
"""
Single object tracked across multiple frames
"""
track_id: int
label: str
frames: Dict[int, dict] = field(default_factory=dict)
def add_detection(self, frame_num: int, bbox: BoundingBox,
state: ObjectState = ObjectState.VISIBLE,
keyframe: bool = False):
"""
Add detection at specific frame
Args:
frame_num: Frame number
bbox: Bounding box at this frame
state: Visibility state
keyframe: Is this a manually annotated keyframe?
"""
self.frames[frame_num] = {
'bbox': bbox,
'state': state,
'keyframe': keyframe
}
def interpolate_frames(self, start_frame: int, end_frame: int,
method: str = 'linear'):
"""
Interpolate bounding boxes between keyframes
Args:
start_frame: Start frame (must be annotated)
end_frame: End frame (must be annotated)
method: 'linear', 'cubic', or 'optical_flow'
"""
if start_frame not in self.frames or end_frame not in self.frames:
raise ValueError("Both start and end frames must be annotated")
if method == 'linear':
self._linear_interpolation(start_frame, end_frame)
elif method == 'cubic':
self._cubic_interpolation(start_frame, end_frame)
else:
raise ValueError(f"Unknown interpolation method: {method}")
def _linear_interpolation(self, start_frame: int, end_frame: int):
"""Linear interpolation of bounding boxes"""
start_bbox = self.frames[start_frame]['bbox']
end_bbox = self.frames[end_frame]['bbox']
num_frames = end_frame - start_frame
for i in range(1, num_frames):
frame_num = start_frame + i
alpha = i / num_frames
# Interpolate each coordinate
x = start_bbox.x + alpha * (end_bbox.x - start_bbox.x)
y = start_bbox.y + alpha * (end_bbox.y - start_bbox.y)
w = start_bbox.width + alpha * (end_bbox.width - start_bbox.width)
h = start_bbox.height + alpha * (end_bbox.height - start_bbox.height)
interpolated_bbox = BoundingBox(
x=x, y=y, width=w, height=h, label=self.label
)
self.add_detection(
frame_num,
interpolated_bbox,
state=ObjectState.VISIBLE,
keyframe=False
)
def get_trajectory(self) -> List[Tuple[int, BoundingBox]]:
"""Get sorted list of (frame, bbox) tuples"""
return sorted(
[(f, d['bbox']) for f, d in self.frames.items()],
key=lambda x: x[0]
)
def coverage(self, total_frames: int) -> float:
"""Calculate what % of frames have annotations"""
return len(self.frames) / total_frames
@dataclass
class VideoAnnotation:
"""
Complete annotation for a video
"""
video_id: str
fps: float
total_frames: int
width: int
height: int
tracks: Dict[int, TrackedObject] = field(default_factory=dict)
_next_track_id: int = field(default=0, init=False)
def create_track(self, label: str, first_frame: int,
bbox: BoundingBox) -> int:
"""
Create new tracked object
Returns:
track_id of created track
"""
track_id = self._next_track_id
self._next_track_id += 1
track = TrackedObject(track_id=track_id, label=label)
track.add_detection(first_frame, bbox, keyframe=True)
self.tracks[track_id] = track
return track_id
def get_frame_annotations(self, frame_num: int) -> List[Tuple[int, BoundingBox]]:
"""Get all bounding boxes for a specific frame"""
annotations = []
for track_id, track in self.tracks.items():
if frame_num in track.frames:
annotations.append((
track_id,
track.frames[frame_num]['bbox']
))
return annotations
def export_to_mot_format(self, output_path: str):
"""
Export to MOT Challenge format
Format: <frame>, <id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <conf>, <x>, <y>, <z>
"""
with open(output_path, 'w') as f:
for track_id, track in self.tracks.items():
for frame_num in sorted(track.frames.keys()):
bbox = track.frames[frame_num]['bbox']
# MOT format uses 1-indexed frames
f.write(f"{frame_num + 1},{track_id},{bbox.x},{bbox.y},"
f"{bbox.width},{bbox.height},1,-1,-1,-1\n")
# Example usage
video_ann = VideoAnnotation(
video_id="traffic_001",
fps=30.0,
total_frames=900, # 30 seconds
width=1920,
height=1080
)
# Create track for a car
car_track_id = video_ann.create_track(
label="car",
first_frame=0,
bbox=BoundingBox(100, 200, 150, 100, "car")
)
# Add keyframe at frame 30
video_ann.tracks[car_track_id].add_detection(
frame_num=30,
bbox=BoundingBox(250, 220, 160, 110, "car"),
keyframe=True
)
# Interpolate frames 0-30
video_ann.tracks[car_track_id].interpolate_frames(0, 30, method='linear')
# Get all annotations at frame 15
frame_15_anns = video_ann.get_frame_annotations(15)
print(f"Frame 15 has {len(frame_15_anns)} objects")
print(f"Car position at frame 15: {frame_15_anns[0][1].to_xywh()}")17.3.3 Video Annotation Workflow
flowchart TD
A[Load Video] --> B[Automatic Scene Detection]
B --> C[Present Keyframes to Annotator]
C --> D[Annotator Labels Objects in Keyframes]
D --> E{Object Exits/Enters Scene?}
E -->|Yes| F[Mark Entry/Exit Frames]
E -->|No| G[Continue to Next Keyframe]
F --> G
G --> H[Automatic Interpolation]
H --> I[Annotator Reviews Interpolated Frames]
I --> J{Quality OK?}
J -->|No| K[Add Correction Keyframe]
K --> H
J -->|Yes| L[Mark Track as Complete]
L --> M{More Objects?}
M -->|Yes| D
M -->|No| N[Export Annotations]
17.4 Audio Annotation
17.4.1 Task Taxonomy
Code
import pandas as pd
audio_tasks = pd.DataFrame({
'Task': [
'Speech Transcription',
'Speaker Diarization',
'Emotion Recognition',
'Intent Classification',
'Sound Event Detection',
'Music Tagging',
'Audio Quality Assessment',
'Wake Word Detection'
],
'Granularity': [
'Utterance',
'Segment',
'Utterance',
'Utterance',
'Event',
'Track',
'Track',
'Segment'
],
'Complexity': [3, 4, 3, 2, 3, 2, 2, 2],
'Avg Time ($/min audio)': [1.50, 2.00, 0.50, 0.30, 1.00, 0.40, 0.30, 0.40],
'Special Requirements': [
'Domain vocabulary',
'Multi-speaker handling',
'Cultural context',
'Domain knowledge',
'Sound taxonomy',
'Music knowledge',
'Audio expertise',
'Precision timing'
]
})
audio_tasks17.4.2 Transcription Data Structure
from dataclasses import dataclass
from typing import List, Optional
import re
@dataclass
class TranscriptionSegment:
"""
Single segment of transcribed audio
"""
start_time: float # seconds
end_time: float # seconds
text: str
speaker_id: Optional[str] = None
confidence: float = 1.0
def duration(self) -> float:
"""Segment duration in seconds"""
return self.end_time - self.start_time
def word_count(self) -> int:
"""Count words in segment"""
return len(self.text.split())
def speaking_rate(self) -> float:
"""Words per minute"""
if self.duration() == 0:
return 0
return (self.word_count() / self.duration()) * 60
@dataclass
class AudioTranscription:
"""
Complete transcription of audio file
"""
audio_id: str
duration: float # total audio duration
segments: List[TranscriptionSegment]
def total_words(self) -> int:
"""Total word count"""
return sum(seg.word_count() for seg in self.segments)
def average_speaking_rate(self) -> float:
"""Average words per minute across all segments"""
total_duration = sum(seg.duration() for seg in self.segments)
if total_duration == 0:
return 0
return (self.total_words() / total_duration) * 60
def get_speaker_segments(self, speaker_id: str) -> List[TranscriptionSegment]:
"""Get all segments for a specific speaker"""
return [seg for seg in self.segments if seg.speaker_id == speaker_id]
def to_srt(self) -> str:
"""
Export to SRT subtitle format
Format:
1
00:00:00,000 --> 00:00:02,500
First subtitle text
2
00:00:02,500 --> 00:00:05,000
Second subtitle text
"""
def format_timestamp(seconds: float) -> str:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
srt_output = []
for i, seg in enumerate(self.segments, 1):
srt_output.append(f"{i}")
srt_output.append(
f"{format_timestamp(seg.start_time)} --> "
f"{format_timestamp(seg.end_time)}"
)
if seg.speaker_id:
srt_output.append(f"[{seg.speaker_id}] {seg.text}")
else:
srt_output.append(seg.text)
srt_output.append("") # Blank line between entries
return "\n".join(srt_output)
def to_vtt(self) -> str:
"""Export to WebVTT format"""
def format_timestamp(seconds: float) -> str:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"
vtt_output = ["WEBVTT", ""]
for seg in self.segments:
vtt_output.append(
f"{format_timestamp(seg.start_time)} --> "
f"{format_timestamp(seg.end_time)}"
)
if seg.speaker_id:
vtt_output.append(f"<v {seg.speaker_id}>{seg.text}</v>")
else:
vtt_output.append(seg.text)
vtt_output.append("")
return "\n".join(vtt_output)
# Example usage
transcription = AudioTranscription(
audio_id="interview_001",
duration=180.0, # 3 minutes
segments=[
TranscriptionSegment(
start_time=0.0,
end_time=3.5,
text="Hello, thank you for joining us today.",
speaker_id="SPEAKER_1"
),
TranscriptionSegment(
start_time=3.8,
end_time=6.2,
text="Thanks for having me.",
speaker_id="SPEAKER_2"
),
TranscriptionSegment(
start_time=6.5,
end_time=12.1,
text="Let's start with your background in machine learning.",
speaker_id="SPEAKER_1"
)
]
)
print(f"Total words: {transcription.total_words()}")
print(f"Average speaking rate: {transcription.average_speaking_rate():.1f} WPM")
print("\nSRT format:")
print(transcription.to_srt()[:200] + "...")17.4.3 Transcription Quality Metrics
import Levenshtein
from typing import List, Tuple
class TranscriptionQualityMetrics:
"""
Calculate quality metrics for transcriptions
"""
@staticmethod
def word_error_rate(reference: str, hypothesis: str) -> float:
"""
Calculate Word Error Rate (WER)
WER = (Substitutions + Deletions + Insertions) / Total Words in Reference
Standard metric for ASR evaluation
"""
ref_words = reference.lower().split()
hyp_words = hypothesis.lower().split()
# Levenshtein distance at word level
distance = Levenshtein.distance(ref_words, hyp_words)
if len(ref_words) == 0:
return 0.0 if len(hyp_words) == 0 else float('inf')
return distance / len(ref_words)
@staticmethod
def character_error_rate(reference: str, hypothesis: str) -> float:
"""
Calculate Character Error Rate (CER)
More fine-grained than WER
"""
distance = Levenshtein.distance(reference.lower(), hypothesis.lower())
if len(reference) == 0:
return 0.0 if len(hypothesis) == 0 else float('inf')
return distance / len(reference)
@staticmethod
def calculate_agreement(
transcriptions: List[str],
use_wer: bool = True
) -> dict:
"""
Calculate inter-annotator agreement for transcriptions
Args:
transcriptions: List of transcription strings
use_wer: Use WER (True) or CER (False)
Returns:
dict with mean agreement, pairwise agreements
"""
if len(transcriptions) < 2:
return {'mean_agreement': 1.0, 'pairwise': []}
metric_func = (TranscriptionQualityMetrics.word_error_rate if use_wer
else TranscriptionQualityMetrics.character_error_rate)
pairwise_errors = []
n = len(transcriptions)
for i in range(n):
for j in range(i + 1, n):
error_rate = metric_func(transcriptions[i], transcriptions[j])
agreement = 1 - error_rate # Convert error to agreement
pairwise_errors.append({
'transcriber_1': i,
'transcriber_2': j,
'agreement': max(0, agreement), # Clamp to [0, 1]
'error_rate': error_rate
})
mean_agreement = np.mean([p['agreement'] for p in pairwise_errors])
return {
'mean_agreement': mean_agreement,
'pairwise': pairwise_errors,
'metric': 'WER' if use_wer else 'CER'
}
@staticmethod
def calculate_majority_vote(transcriptions: List[str]) -> str:
"""
Find consensus transcription using edit distance
Returns transcription with minimum total distance to all others
"""
if not transcriptions:
return ""
if len(transcriptions) == 1:
return transcriptions[0]
min_total_distance = float('inf')
consensus = transcriptions[0]
for candidate in transcriptions:
total_distance = sum(
Levenshtein.distance(candidate.lower(), other.lower())
for other in transcriptions if other != candidate
)
if total_distance < min_total_distance:
min_total_distance = total_distance
consensus = candidate
return consensus
# Example usage
reference = "The quick brown fox jumps over the lazy dog"
hypothesis1 = "The quick brown fox jumped over the lazy dog"
hypothesis2 = "The quick brown fox jumps over a lazy dog"
hypothesis3 = "The quick braun fox jumps over the lazy dog"
# Calculate WER
wer1 = TranscriptionQualityMetrics.word_error_rate(reference, hypothesis1)
wer2 = TranscriptionQualityMetrics.word_error_rate(reference, hypothesis2)
wer3 = TranscriptionQualityMetrics.word_error_rate(reference, hypothesis3)
print(f"WER (hypothesis 1): {wer1:.3f}")
print(f"WER (hypothesis 2): {wer2:.3f}")
print(f"WER (hypothesis 3): {wer3:.3f}")
# Calculate inter-annotator agreement
transcriptions = [reference, hypothesis1, hypothesis2, hypothesis3]
agreement = TranscriptionQualityMetrics.calculate_agreement(transcriptions)
print(f"\nMean agreement: {agreement['mean_agreement']:.3f}")
# Find consensus
consensus = TranscriptionQualityMetrics.calculate_majority_vote(transcriptions)
print(f"\nConsensus transcription: {consensus}")18 Quality Assurance & Metrics
18.1 Inter-Rater Reliability
18.1.1 Overview of IRR Metrics
Code
import pandas as pd
irr_metrics = pd.DataFrame({
'Metric': [
'Percent Agreement',
'Cohen\'s Kappa',
'Fleiss\' Kappa',
'Krippendorff\'s Alpha',
'ICC (Intraclass Correlation)',
'F1 Score',
'Dice Coefficient',
'IoU (Jaccard Index)'
],
'Data Type': [
'Categorical',
'Categorical',
'Categorical',
'Any scale',
'Continuous',
'Binary/Multi-class',
'Binary (segmentation)',
'Binary (detection)'
],
'Raters': [
'≥2',
'2',
'≥2',
'≥2',
'≥2',
'2',
'2',
'2'
],
'Accounts for Chance': [
'No',
'Yes',
'Yes',
'Yes',
'Partial',
'No',
'No',
'No'
],
'Missing Data': [
'No',
'No',
'No',
'Yes',
'Yes (some variants)',
'No',
'No',
'No'
],
'Use Case': [
'Quick check (not recommended)',
'Traditional research',
'Traditional research (>2 raters)',
'Research (most flexible)',
'Continuous ratings',
'ML evaluation',
'Segmentation evaluation',
'Detection evaluation'
]
})
irr_metrics18.1.2 Implementation: Traditional Metrics
import numpy as np
from typing import List, Union
from scipy.stats import kendalltau
import pandas as pd
class InterRaterReliability:
"""
Calculate various inter-rater reliability metrics
"""
@staticmethod
def percent_agreement(rater1: np.ndarray, rater2: np.ndarray) -> float:
"""
Simple percent agreement (not recommended - doesn't account for chance)
Args:
rater1: Array of ratings from rater 1
rater2: Array of ratings from rater 2
Returns:
Proportion of agreement (0 to 1)
"""
if len(rater1) != len(rater2):
raise ValueError("Rater arrays must be same length")
return np.mean(rater1 == rater2)
@staticmethod
def cohens_kappa(rater1: np.ndarray, rater2: np.ndarray) -> float:
"""
Cohen's Kappa for two raters
κ = (p_o - p_e) / (1 - p_e)
where:
- p_o = observed agreement
- p_e = expected agreement by chance
Interpretation:
- < 0: Less than chance agreement
- 0.01-0.20: Slight agreement
- 0.21-0.40: Fair agreement
- 0.41-0.60: Moderate agreement
- 0.61-0.80: Substantial agreement
- 0.81-1.00: Almost perfect agreement
"""
if len(rater1) != len(rater2):
raise ValueError("Rater arrays must be same length")
# Observed agreement
p_o = np.mean(rater1 == rater2)
# Expected agreement
categories = np.unique(np.concatenate([rater1, rater2]))
p_e = 0
for category in categories:
p1 = np.mean(rater1 == category)
p2 = np.mean(rater2 == category)
p_e += p1 * p2
# Cohen's kappa
if p_e == 1:
return 1.0
kappa = (p_o - p_e) / (1 - p_e)
return kappa
@staticmethod
def fleiss_kappa(ratings: np.ndarray) -> float:
"""
Fleiss' Kappa for multiple raters
Args:
ratings: 2D array of shape (n_items, n_raters)
Returns:
Fleiss' kappa coefficient
"""
n_items, n_raters = ratings.shape
# Get unique categories
categories = np.unique(ratings)
n_categories = len(categories)
# Count category assignments per item
category_counts = np.zeros((n_items, n_categories))
for i, category in enumerate(categories):
category_counts[:, i] = np.sum(ratings == category, axis=1)
# Calculate P_i (extent of agreement for item i)
P_i = (np.sum(category_counts ** 2, axis=1) - n_raters) / (n_raters * (n_raters - 1))
# Mean observed agreement
P_bar = np.mean(P_i)
# Expected agreement
p_j = np.sum(category_counts, axis=0) / (n_items * n_raters)
P_e_bar = np.sum(p_j ** 2)
# Fleiss' kappa
if P_e_bar == 1:
return 1.0
kappa = (P_bar - P_e_bar) / (1 - P_e_bar)
return kappa
@staticmethod
def krippendorff_alpha(ratings: np.ndarray, level_of_measurement: str = 'nominal') -> float:
"""
Krippendorff's Alpha - most flexible reliability measure
Handles:
- Missing data
- Any number of raters
- Different levels of measurement
Args:
ratings: 2D array of shape (n_raters, n_items)
Use np.nan for missing values
level_of_measurement: 'nominal', 'ordinal', 'interval', or 'ratio'
Returns:
Alpha coefficient
"""
n_raters, n_items = ratings.shape
# Define distance function based on level of measurement
if level_of_measurement == 'nominal':
def distance(c, k):
return 0 if c == k else 1
elif level_of_measurement == 'ordinal':
def distance(c, k):
# For ordinal data, distance is based on cumulative proportions
return (c - k) ** 2
elif level_of_measurement in ['interval', 'ratio']:
def distance(c, k):
return (c - k) ** 2
else:
raise ValueError(f"Unknown level of measurement: {level_of_measurement}")
# Create coincidence matrix
categories = np.unique(ratings[~np.isnan(ratings)])
n_c = len(categories)
coincidence_matrix = np.zeros((n_c, n_c))
# Count coincidences
for item_idx in range(n_items):
item_ratings = ratings[:, item_idx]
item_ratings = item_ratings[~np.isnan(item_ratings)]
m_u = len(item_ratings)
if m_u < 2:
continue
for c_idx, c in enumerate(categories):
for k_idx, k in enumerate(categories):
n_ck = np.sum((item_ratings == c)[:, None] * (item_ratings == k))
if c == k:
n_ck -= np.sum(item_ratings == c)
coincidence_matrix[c_idx, k_idx] += n_ck / (m_u - 1)
# Calculate observed and expected disagreement
n_values = np.sum(coincidence_matrix)
if n_values == 0:
return np.nan
D_o = 0
D_e = 0
for c_idx, c in enumerate(categories):
for k_idx, k in enumerate(categories):
if c_idx != k_idx:
d = distance(c, k)
D_o += coincidence_matrix[c_idx, k_idx] * d
n_c = np.sum(coincidence_matrix[c_idx, :])
n_k = np.sum(coincidence_matrix[:, k_idx])
D_e += n_c * n_k * d
D_o /= n_values
D_e /= (n_values * (n_values - 1))
# Krippendorff's alpha
if D_e == 0:
return 1.0
alpha = 1 - (D_o / D_e)
return alpha
@staticmethod
def icc(ratings: np.ndarray, icc_type: str = 'ICC(2,1)') -> float:
"""
Intraclass Correlation Coefficient
Args:
ratings: 2D array of shape (n_subjects, n_raters)
icc_type: Type of ICC
- ICC(1,1): Each subject rated by different raters
- ICC(2,1): Random sample of raters
- ICC(3,1): Fixed set of raters
Returns:
ICC value
"""
n_subjects, n_raters = ratings.shape
# Calculate mean squares
mean_ratings = np.mean(ratings, axis=1)
grand_mean = np.mean(ratings)
# Between-subjects mean square
MS_between = n_raters * np.sum((mean_ratings - grand_mean) ** 2) / (n_subjects - 1)
# Within-subjects mean square
MS_within = np.sum((ratings - mean_ratings[:, None]) ** 2) / (n_subjects * (n_raters - 1))
# Mean square for raters
mean_per_rater = np.mean(ratings, axis=0)
MS_raters = n_subjects * np.sum((mean_per_rater - grand_mean) ** 2) / (n_raters - 1)
# Mean square error
MS_error = (n_subjects * (n_raters - 1) * MS_within - (n_subjects - 1) * (MS_raters - MS_within)) / ((n_subjects - 1) * (n_raters - 1))
if icc_type == 'ICC(1,1)':
icc = (MS_between - MS_within) / (MS_between + (n_raters - 1) * MS_within)
elif icc_type == 'ICC(2,1)':
icc = (MS_between - MS_error) / (MS_between + (n_raters - 1) * MS_error + n_raters * (MS_raters - MS_error) / n_subjects)
elif icc_type == 'ICC(3,1)':
icc = (MS_between - MS_error) / (MS_between + (n_raters - 1) * MS_error)
else:
raise ValueError(f"Unknown ICC type: {icc_type}")
return icc
# Example usage with synthetic data
np.random.seed(42)
# Binary classification task - 100 examples, 3 raters
n_examples = 100
n_raters = 3
# Generate ratings with some agreement
base_truth = np.random.randint(0, 2, n_examples)
ratings_matrix = np.zeros((n_raters, n_examples), dtype=int)
for i in range(n_raters):
# Each rater agrees with truth 80% of the time
ratings_matrix[i] = base_truth.copy()
flip_mask = np.random.random(n_examples) > 0.8
ratings_matrix[i, flip_mask] = 1 - ratings_matrix[i, flip_mask]
# Calculate metrics
irr = InterRaterReliability()
# Pairwise Cohen's kappa
kappa_01 = irr.cohens_kappa(ratings_matrix[0], ratings_matrix[1])
kappa_02 = irr.cohens_kappa(ratings_matrix[0], ratings_matrix[2])
kappa_12 = irr.cohens_kappa(ratings_matrix[1], ratings_matrix[2])
print("Cohen's Kappa (pairwise):")
print(f" Rater 0-1: {kappa_01:.3f}")
print(f" Rater 0-2: {kappa_02:.3f}")
print(f" Rater 1-2: {kappa_12:.3f}")
# Fleiss' kappa
fleiss = irr.fleiss_kappa(ratings_matrix.T)
print(f"\nFleiss' Kappa: {fleiss:.3f}")
# Krippendorff's alpha
alpha = irr.krippendorff_alpha(ratings_matrix, level_of_measurement='nominal')
print(f"Krippendorff's Alpha: {alpha:.3f}")18.1.3 ML-Specific Agreement Metrics
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support
import matplotlib.pyplot as plt
import seaborn as sns
class MLAnnotationMetrics:
"""
Metrics specific to ML annotation quality
"""
@staticmethod
def multi_annotator_confusion_matrix(
gold_labels: np.ndarray,
annotator_labels: List[np.ndarray],
labels: List[str]
) -> dict:
"""
Create confusion matrices for each annotator vs gold standard
Args:
gold_labels: Ground truth labels
annotator_labels: List of label arrays, one per annotator
labels: List of label names
Returns:
Dict with confusion matrices and metrics per annotator
"""
results = {}
for i, ann_labels in enumerate(annotator_labels):
cm = confusion_matrix(gold_labels, ann_labels, labels=range(len(labels)))
precision, recall, f1, support = precision_recall_fscore_support(
gold_labels, ann_labels, average='macro', zero_division=0
)
results[f'annotator_{i}'] = {
'confusion_matrix': cm,
'precision': precision,
'recall': recall,
'f1': f1,
'accuracy': np.mean(gold_labels == ann_labels)
}
return results
@staticmethod
def entropy_disagreement(annotations: np.ndarray) -> np.ndarray:
"""
Calculate entropy of annotations per example
High entropy = high disagreement = difficult example
Args:
annotations: 2D array (n_examples, n_annotators)
Returns:
Array of entropy values per example
"""
n_examples, n_annotators = annotations.shape
entropies = np.zeros(n_examples)
for i in range(n_examples):
labels, counts = np.unique(annotations[i], return_counts=True)
probs = counts / n_annotators
entropy = -np.sum(probs * np.log2(probs + 1e-10))
entropies[i] = entropy
return entropies
@staticmethod
def identify_difficult_examples(
annotations: np.ndarray,
threshold: float = 0.5,
min_annotators: int = 3
) -> np.ndarray:
"""
Identify examples with high disagreement
Args:
annotations: 2D array (n_examples, n_annotators)
threshold: Entropy threshold for "difficult"
min_annotators: Minimum annotators required
Returns:
Boolean array indicating difficult examples
"""
if annotations.shape[1] < min_annotators:
raise ValueError(f"Need at least {min_annotators} annotators")
entropies = MLAnnotationMetrics.entropy_disagreement(annotations)
max_possible_entropy = np.log2(annotations.shape[1])
normalized_entropy = entropies / max_possible_entropy
return normalized_entropy > threshold
@staticmethod
def annotator_skill_estimation(
annotations: np.ndarray,
gold_labels: np.ndarray = None
) -> pd.DataFrame:
"""
Estimate annotator skill levels
If gold labels available: direct accuracy comparison
If not: use peer agreement as proxy
Args:
annotations: 2D array (n_examples, n_annotators)
gold_labels: Optional ground truth labels
Returns:
DataFrame with annotator metrics
"""
n_examples, n_annotators = annotations.shape
metrics = []
for i in range(n_annotators):
annotator_labels = annotations[:, i]
if gold_labels is not None:
# Direct accuracy
accuracy = np.mean(annotator_labels == gold_labels)
metric_name = 'accuracy_vs_gold'
else:
# Agreement with majority vote
majority_vote = []
for j in range(n_examples):
other_labels = np.concatenate([
annotations[j, :i],
annotations[j, i+1:]
])
# Most common label (excluding this annotator)
values, counts = np.unique(other_labels, return_counts=True)
majority_vote.append(values[np.argmax(counts)])
majority_vote = np.array(majority_vote)
accuracy = np.mean(annotator_labels == majority_vote)
metric_name = 'peer_agreement'
# Consistency (self-agreement on duplicated examples would go here)
metrics.append({
'annotator_id': i,
metric_name: accuracy,
'total_annotations': n_examples
})
return pd.DataFrame(metrics)
# Example usage
np.random.seed(42)
# Simulate annotation scenario
n_examples = 200
n_annotators = 5
n_classes = 3
# Generate "true" labels with varying difficulty
difficulty = np.random.beta(2, 5, n_examples) # Most examples are "easy"
true_labels = np.random.randint(0, n_classes, n_examples)
# Generate annotations based on difficulty
annotations = np.zeros((n_examples, n_annotators), dtype=int)
for i in range(n_examples):
# Harder examples have more disagreement
error_rate = difficulty[i] * 0.5 # Up to 50% error on hardest examples
for j in range(n_annotators):
if np.random.random() < error_rate:
# Make error
wrong_labels = [l for l in range(n_classes) if l != true_labels[i]]
annotations[i, j] = np.random.choice(wrong_labels)
else:
annotations[i, j] = true_labels[i]
# Calculate metrics
ml_metrics = MLAnnotationMetrics()
# Identify difficult examples
difficult = ml_metrics.identify_difficult_examples(annotations, threshold=0.3)
print(f"Difficult examples: {np.sum(difficult)} / {n_examples} ({np.sum(difficult)/n_examples*100:.1f}%)")
# Estimate annotator skills
annotator_skills = ml_metrics.annotator_skill_estimation(annotations, true_labels)
print("\nAnnotator Skills:")
print(annotator_skills)
# Calculate entropy for all examples
entropies = ml_metrics.entropy_disagreement(annotations)
# Plot entropy distribution
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.hist(entropies, bins=30, edgecolor='black', alpha=0.7)
ax.axvline(np.mean(entropies), color='red', linestyle='--', label=f'Mean: {np.mean(entropies):.3f}')
ax.set_xlabel('Entropy (Disagreement)')
ax.set_ylabel('Number of Examples')
ax.set_title('Distribution of Annotation Disagreement')
ax.legend()
plt.tight_layout()
plt.savefig('entropy_distribution.png', dpi=150, bbox_inches='tight')
plt.close()
print("\nEntropy statistics:")
print(f" Mean: {np.mean(entropies):.3f}")
print(f" Std: {np.std(entropies):.3f}")
print(f" Min: {np.min(entropies):.3f}")
print(f" Max: {np.max(entropies):.3f}")18.2 Dawid-Skene Model
18.2.1 Theory
The Dawid-Skene model is a probabilistic approach to aggregate labels from multiple annotators, accounting for varying annotator quality.
Model components:
- True labels (\(T_i\)): Unknown ground truth for item \(i\)
- Observed labels (\(L_{ij}\)): Label from annotator \(j\) for item \(i\)
- Prior (\(\pi_k\)): Probability that true label is class \(k\)
- Error rates (\(\theta_{jkl}\)): Probability annotator \(j\) labels class \(k\) as \(l\)
Likelihood:
\[P(L_{ij} = l | T_i = k, \theta_j) = \theta_{jkl}\]
Estimation via EM:
E-step: Estimate posterior probability of true label
\[P(T_i = k | L_i, \theta) \propto \pi_k \prod_{j=1}^{J} \theta_{jk,L_{ij}}\]
M-step: Update parameters
\[\pi_k = \frac{1}{I}\sum_{i=1}^{I} P(T_i = k | L_i, \theta)\]
\[\theta_{jkl} = \frac{\sum_{i=1}^{I} P(T_i = k | L_i, \theta) \mathbb{1}[L_{ij} = l]}{\sum_{i=1}^{I} P(T_i = k | L_i, \theta)}\]
18.2.2 Implementation
import numpy as np
from typing import Dict, List, Tuple
import pandas as pd
class DawidSkeneModel:
"""
Dawid-Skene model for multi-annotator label aggregation
Estimates:
1. True labels for each example
2. Error rates for each annotator
3. Class priors
"""
def __init__(self, n_classes: int, n_annotators: int):
"""
Args:
n_classes: Number of label classes
n_annotators: Number of annotators
"""
self.n_classes = n_classes
self.n_annotators = n_annotators
# Parameters to estimate
self.class_priors = np.ones(n_classes) / n_classes
self.error_rates = np.zeros((n_annotators, n_classes, n_classes))
# Initialize error rates with diagonal dominance
for j in range(n_annotators):
self.error_rates[j] = np.eye(n_classes) * 0.7 + 0.3 / n_classes
self.true_label_probs = None
def fit(self, annotations: np.ndarray, max_iter: int = 100,
tol: float = 1e-4, verbose: bool = False) -> 'DawidSkeneModel':
"""
Fit Dawid-Skene model using EM algorithm
Args:
annotations: 2D array (n_examples, n_annotators)
Use -1 for missing annotations
max_iter: Maximum EM iterations
tol: Convergence tolerance
verbose: Print iteration info
Returns:
self
"""
n_examples = annotations.shape[0]
# Initialize true label probabilities
self.true_label_probs = np.ones((n_examples, self.n_classes)) / self.n_classes
prev_likelihood = -np.inf
for iteration in range(max_iter):
# E-step: Estimate true labels
self._e_step(annotations)
# M-step: Update parameters
self._m_step(annotations)
# Calculate likelihood
likelihood = self._calculate_likelihood(annotations)
if verbose and iteration % 10 == 0:
print(f"Iteration {iteration}: Likelihood = {likelihood:.4f}")
# Check convergence
if abs(likelihood - prev_likelihood) < tol:
if verbose:
print(f"Converged after {iteration} iterations")
break
prev_likelihood = likelihood
return self
def _e_step(self, annotations: np.ndarray):
"""E-step: Estimate posterior probabilities of true labels"""
n_examples = annotations.shape[0]
for i in range(n_examples):
for k in range(self.n_classes):
# Start with prior
prob = self.class_priors[k]
# Multiply by likelihood from each annotator
for j in range(self.n_annotators):
label = annotations[i, j]
if label != -1: # Skip missing annotations
prob *= self.error_rates[j, k, label]
self.true_label_probs[i, k] = prob
# Normalize
self.true_label_probs[i] /= np.sum(self.true_label_probs[i])
def _m_step(self, annotations: np.ndarray):
"""M-step: Update model parameters"""
n_examples = annotations.shape[0]
# Update class priors
self.class_priors = np.mean(self.true_label_probs, axis=0)
# Update error rates
for j in range(self.n_annotators):
for k in range(self.n_classes):
for l in range(self.n_classes):
numerator = 0
denominator = 0
for i in range(n_examples):
if annotations[i, j] != -1:
prob_true_k = self.true_label_probs[i, k]
denominator += prob_true_k
if annotations[i, j] == l:
numerator += prob_true_k
if denominator > 0:
self.error_rates[j, k, l] = numerator / denominator
else:
# Default to uniform if no data
self.error_rates[j, k, l] = 1.0 / self.n_classes
def _calculate_likelihood(self, annotations: np.ndarray) -> float:
"""Calculate log-likelihood of current parameters"""
n_examples = annotations.shape[0]
log_likelihood = 0
for i in range(n_examples):
example_likelihood = 0
for k in range(self.n_classes):
prob = self.class_priors[k]
for j in range(self.n_annotators):
label = annotations[i, j]
if label != -1:
prob *= self.error_rates[j, k, label]
example_likelihood += prob
log_likelihood += np.log(example_likelihood + 1e-10)
return log_likelihood
def predict(self) -> np.ndarray:
"""
Get predicted true labels
Returns:
Array of predicted labels
"""
return np.argmax(self.true_label_probs, axis=1)
def predict_proba(self) -> np.ndarray:
"""
Get probability distributions over true labels
Returns:
2D array (n_examples, n_classes)
"""
return self.true_label_probs
def get_annotator_quality(self) -> pd.DataFrame:
"""
Calculate quality metrics for each annotator
Returns:
DataFrame with annotator quality metrics
"""
metrics = []
for j in range(self.n_annotators):
# Accuracy (diagonal of error matrix)
accuracy = np.mean(np.diag(self.error_rates[j]))
# Worst-case accuracy (minimum diagonal element)
min_accuracy = np.min(np.diag(self.error_rates[j]))
# Confusion (off-diagonal mass)
confusion = 1 - accuracy
metrics.append({
'annotator_id': j,
'accuracy': accuracy,
'min_class_accuracy': min_accuracy,
'confusion_rate': confusion
})
return pd.DataFrame(metrics)
def visualize_error_matrices(self, annotator_ids: List[int] = None,
class_names: List[str] = None):
"""
Visualize error matrices for annotators
Args:
annotator_ids: Which annotators to plot (default: all)
class_names: Names for classes (default: 0, 1, 2, ...)
"""
import matplotlib.pyplot as plt
import seaborn as sns
if annotator_ids is None:
annotator_ids = range(self.n_annotators)
if class_names is None:
class_names = [str(i) for i in range(self.n_classes)]
n_plots = len(annotator_ids)
fig, axes = plt.subplots(1, n_plots, figsize=(5 * n_plots, 4))
if n_plots == 1:
axes = [axes]
for idx, annotator_id in enumerate(annotator_ids):
sns.heatmap(
self.error_rates[annotator_id],
annot=True,
fmt='.3f',
cmap='YlOrRd',
xticklabels=class_names,
yticklabels=class_names,
ax=axes[idx],
cbar_kws={'label': 'Probability'}
)
axes[idx].set_title(f'Annotator {annotator_id} Error Matrix')
axes[idx].set_xlabel('Observed Label')
axes[idx].set_ylabel('True Label')
plt.tight_layout()
plt.savefig('dawid_skene_error_matrices.png', dpi=150, bbox_inches='tight')
plt.close()
# Example usage with synthetic data
np.random.seed(42)
# Create synthetic annotation scenario
n_examples = 500
n_annotators = 5
n_classes = 3
# Generate true labels
true_labels = np.random.randint(0, n_classes, n_examples)
# Create annotations with varying quality annotators
annotations = np.full((n_examples, n_annotators), -1, dtype=int)
# Annotator quality levels
annotator_accuracy = [0.95, 0.85, 0.75, 0.65, 0.55]
for j in range(n_annotators):
# Each annotator sees 80% of examples
annotated_mask = np.random.random(n_examples) < 0.8
for i in range(n_examples):
if annotated_mask[i]:
if np.random.random() < annotator_accuracy[j]:
# Correct label
annotations[i, j] = true_labels[i]
else:
# Random error
wrong_labels = [l for l in range(n_classes) if l != true_labels[i]]
annotations[i, j] = np.random.choice(wrong_labels)
# Fit Dawid-Skene model
ds_model = DawidSkeneModel(n_classes=n_classes, n_annotators=n_annotators)
ds_model.fit(annotations, verbose=True)
# Get predictions
predicted_labels = ds_model.predict()
# Calculate accuracy vs true labels
accuracy = np.mean(predicted_labels == true_labels)
print(f"\nDawid-Skene Accuracy: {accuracy:.3f}")
# Compare to majority vote
def majority_vote(annotations):
predictions = []
for i in range(annotations.shape[0]):
labels = annotations[i][annotations[i] != -1]
if len(labels) > 0:
values, counts = np.unique(labels, return_counts=True)
predictions.append(values[np.argmax(counts)])
else:
predictions.append(0) # Default
return np.array(predictions)
mv_predictions = majority_vote(annotations)
mv_accuracy = np.mean(mv_predictions == true_labels)
print(f"Majority Vote Accuracy: {mv_accuracy:.3f}")
# Annotator quality metrics
quality_metrics = ds_model.get_annotator_quality()
print("\nAnnotator Quality (Estimated by Dawid-Skene):")
print(quality_metrics)
print("\nTrue Annotator Quality:")
for j, acc in enumerate(annotator_accuracy):
print(f" Annotator {j}: {acc:.3f}")
# Visualize error matrices
ds_model.visualize_error_matrices(annotator_ids=[0, 2, 4],
class_names=['Class A', 'Class B', 'Class C'])18.2.3 When to Use Dawid-Skene
Code
import pandas as pd
aggregation_methods = pd.DataFrame({
'Method': [
'Majority Vote',
'Weighted Vote',
'Dawid-Skene',
'MACE',
'GLAD',
'Expert Adjudication'
],
'Best When': [
'All annotators roughly equal quality',
'Known annotator quality scores',
'Unknown annotator quality, sufficient data',
'Bayesian approach preferred',
'Item difficulty varies greatly',
'High-stakes, low-volume'
],
'Computational Cost': [
'O(n)',
'O(n)',
'O(n × k × iter)',
'O(n × k × iter)',
'O(n × k × iter)',
'O(n × expert_time)'
],
'Min Annotators': [
3,
3,
3,
3,
3,
1
],
'Handles Missing Data': [
'Yes',
'Yes',
'Yes',
'Yes',
'Yes',
'N/A'
],
'Output': [
'Hard labels',
'Hard labels',
'Soft labels + annotator quality',
'Soft labels + annotator competence',
'Soft labels + item difficulty',
'Hard labels'
],
'Implementation': [
'scipy.stats.mode',
'Custom',
'Custom (shown above)',
'GitHub: dirko/mace',
'GitHub: welinder',
'Human review'
]
})
aggregation_methods19 Label Studio: Implementation
19.1 Installation & Setup
19.1.1 Docker Installation (Recommended)
# Pull latest Label Studio image
docker pull heartexlabs/label-studio:latest
# Run with persistent storage
docker run -d \
--name label-studio \
-p 8080:8080 \
-v $(pwd)/mydata:/label-studio/data \
heartexlabs/label-studio:latest
# Access at http://localhost:8080
# Default credentials: Set up on first visit19.1.2 Python Installation
# Install via pip
pip install label-studio
# Or install with ML backend support
pip install label-studio[ml]
# Run server
label-studio start
# Specify port
label-studio start --port 8080
# With custom data directory
label-studio start --data-dir ./my-label-studio-data19.1.3 Configuration Files
Code
# config.json - Label Studio configuration
{
"port": 8080,
"host": "0.0.0.0",
"data_dir": "./data",
"database": "label_studio.sqlite3",
"allow_signup": false,
"username": "admin@example.com",
"password": "secure_password_here"
}19.2 Project Setup
19.2.1 Creating Projects Programmatically
Code
import requests
import json
# Label Studio API endpoint
BASE_URL = "http://localhost:8080"
API_KEY = "your_api_key_here" # Get from Account & Settings
headers = {
"Authorization": f"Token {API_KEY}",
"Content-Type": "application/json"
}
# Create NER project
ner_project = {
"title": "Named Entity Recognition",
"description": "Annotate entities in text documents",
"label_config": '''
<View>
<Text name="text" value="$text"/>
<Labels name="label" toName="text">
<Label value="PERSON" background="red"/>
<Label value="ORG" background="blue"/>
<Label value="LOC" background="green"/>
</Labels>
</View>
''',
"sampling": "uniform",
"show_collab_predictions": False
}
response = requests.post(
f"{BASE_URL}/api/projects",
headers=headers,
json=ner_project
)
project_id = response.json()['id']
print(f"Created project ID: {project_id}")19.3 Data Import
19.3.1 Importing Tasks
Code
# Import tasks from JSON
tasks = [
{
"data": {
"text": "Apple Inc. CEO Tim Cook announced new products."
}
},
{
"data": {
"text": "Microsoft is headquartered in Redmond, Washington."
}
}
]
response = requests.post(
f"{BASE_URL}/api/projects/{project_id}/import",
headers=headers,
json=tasks
)
print(f"Imported {len(tasks)} tasks")19.3.2 Import from Files
Code
import pandas as pd
# Create dataset
df = pd.DataFrame({
'text': [
'Example text 1',
'Example text 2',
'Example text 3'
],
'metadata': [
{'source': 'web'},
{'source': 'pdf'},
{'source': 'api'}
]
})
# Convert to Label Studio format
tasks = []
for idx, row in df.iterrows():
tasks.append({
'data': {
'text': row['text'],
'id': idx
},
'meta': row['metadata']
})
# Save and import
with open('tasks.json', 'w') as f:
json.dump(tasks, f)
# Import via file upload
files = {'file': open('tasks.json', 'rb')}
response = requests.post(
f"{BASE_URL}/api/projects/{project_id}/import",
headers=headers,
files=files
)19.4 Workflow Automation
19.4.1 Task Assignment & Routing
Code
class TaskRouter:
"""
Intelligent task routing based on annotator performance
"""
def __init__(self, api_key: str, base_url: str = "http://localhost:8080"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json"
}
def get_annotator_stats(self, project_id: int) -> pd.DataFrame:
"""Get performance stats for all annotators"""
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/annotators",
headers=self.headers
)
annotators = response.json()
stats = []
for ann in annotators:
stats.append({
'annotator_id': ann['id'],
'email': ann['email'],
'total_annotations': ann.get('total_annotations', 0),
'avg_time': ann.get('avg_lead_time', 0),
'accuracy': ann.get('accuracy', 0) # If gold tasks exist
})
return pd.DataFrame(stats)
def assign_tasks_by_skill(self, project_id: int, task_ids: List[int],
difficulty_scores: List[float]):
"""
Assign tasks to annotators based on skill and task difficulty
Args:
project_id: Label Studio project ID
task_ids: List of task IDs to assign
difficulty_scores: Difficulty score for each task (0-1)
"""
# Get annotator stats
annotators = self.get_annotator_stats(project_id)
# Sort annotators by accuracy
annotators = annotators.sort_values('accuracy', ascending=False)
# Assign difficult tasks to skilled annotators
for task_id, difficulty in zip(task_ids, difficulty_scores):
if difficulty > 0.7:
# Hard task - assign to top annotator
annotator = annotators.iloc[0]
elif difficulty > 0.4:
# Medium task - assign to middle annotator
annotator = annotators.iloc[len(annotators)//2]
else:
# Easy task - can go to anyone
annotator = annotators.sample(1).iloc[0]
# Make assignment via API
self._assign_task(project_id, task_id, annotator['annotator_id'])
def _assign_task(self, project_id: int, task_id: int, annotator_id: int):
"""Assign specific task to annotator"""
payload = {
'task_id': task_id,
'annotator_id': annotator_id
}
response = requests.post(
f"{self.base_url}/api/projects/{project_id}/tasks/{task_id}/assignments",
headers=self.headers,
json=payload
)
return response.json()
# Example usage
router = TaskRouter(api_key=API_KEY)
# Get list of tasks
response = requests.get(
f"{BASE_URL}/api/projects/{project_id}/tasks",
headers=headers
)
tasks = response.json()
# Calculate difficulty (example: based on text length)
task_ids = [t['id'] for t in tasks]
difficulty_scores = [min(1.0, len(t['data']['text']) / 1000) for t in tasks]
# Assign tasks
router.assign_tasks_by_skill(project_id, task_ids, difficulty_scores)19.4.2 Quality Control Automation
Code
class QualityController:
"""
Automated quality control for Label Studio
"""
def __init__(self, api_key: str, base_url: str = "http://localhost:8080"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json"
}
def inject_gold_tasks(self, project_id: int, gold_ratio: float = 0.1):
"""
Inject gold standard tasks for quality monitoring
Args:
project_id: Project ID
gold_ratio: Proportion of gold tasks (0-1)
"""
# Get existing tasks
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/tasks",
headers=self.headers
)
tasks = response.json()
# Randomly select tasks to be gold
n_gold = int(len(tasks) * gold_ratio)
gold_task_ids = np.random.choice(
[t['id'] for t in tasks],
size=n_gold,
replace=False
)
# Mark as gold and add ground truth
for task_id in gold_task_ids:
# Get task
task = next(t for t in tasks if t['id'] == task_id)
# Add ground truth annotation (example for NER)
gold_annotation = {
'result': [
{
'value': {
'start': 0,
'end': 9,
'text': 'Apple Inc',
'labels': ['ORG']
},
'from_name': 'label',
'to_name': 'text',
'type': 'labels'
}
],
'ground_truth': True
}
# Update task
requests.post(
f"{self.base_url}/api/tasks/{task_id}/annotations",
headers=self.headers,
json=gold_annotation
)
def check_annotator_quality(self, project_id: int,
min_accuracy: float = 0.8) -> List[int]:
"""
Check annotator quality against gold tasks
Args:
project_id: Project ID
min_accuracy: Minimum acceptable accuracy
Returns:
List of annotator IDs below threshold
"""
# Get all annotations
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/annotations",
headers=self.headers
)
annotations = response.json()
# Calculate accuracy per annotator on gold tasks
annotator_accuracy = {}
for ann in annotations:
if ann.get('ground_truth'):
continue # Skip ground truth annotations
task_id = ann['task']
annotator_id = ann['completed_by']
# Get ground truth for this task
gt_response = requests.get(
f"{self.base_url}/api/tasks/{task_id}/annotations",
headers=self.headers,
params={'ground_truth': True}
)
ground_truth = gt_response.json()
if not ground_truth:
continue
# Compare annotation to ground truth
is_correct = self._compare_annotations(
ann['result'],
ground_truth[0]['result']
)
if annotator_id not in annotator_accuracy:
annotator_accuracy[annotator_id] = []
annotator_accuracy[annotator_id].append(is_correct)
# Find annotators below threshold
low_performers = []
for annotator_id, results in annotator_accuracy.items():
accuracy = np.mean(results)
if accuracy < min_accuracy:
low_performers.append(annotator_id)
return low_performers
def _compare_annotations(self, ann1: List[dict], ann2: List[dict]) -> bool:
"""
Compare two annotations for equality
Simplified - actual implementation depends on annotation type
"""
# For NER, compare entity spans and labels
if len(ann1) != len(ann2):
return False
# Sort by start position
ann1_sorted = sorted(ann1, key=lambda x: x['value']['start'])
ann2_sorted = sorted(ann2, key=lambda x: x['value']['start'])
for a1, a2 in zip(ann1_sorted, ann2_sorted):
if (a1['value']['start'] != a2['value']['start'] or
a1['value']['end'] != a2['value']['end'] or
a1['value']['labels'] != a2['value']['labels']):
return False
return True
def auto_review_consensus(self, project_id: int, consensus_threshold: int = 2):
"""
Automatically accept annotations with sufficient consensus
Args:
project_id: Project ID
consensus_threshold: Minimum number of agreeing annotators
"""
# Get tasks with multiple annotations
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/tasks",
headers=self.headers,
params={'annotations__gt': 1}
)
tasks = response.json()
for task in tasks:
task_id = task['id']
# Get all annotations for task
ann_response = requests.get(
f"{self.base_url}/api/tasks/{task_id}/annotations",
headers=self.headers
)
annotations = ann_response.json()
# Find consensus
if self._has_consensus(annotations, consensus_threshold):
# Auto-accept
self._accept_task(task_id)
def _has_consensus(self, annotations: List[dict], threshold: int) -> bool:
"""Check if annotations have consensus"""
# Simplified - actual implementation depends on annotation type
# For classification, check if threshold annotators agree on label
if len(annotations) < threshold:
return False
# Count label frequencies
from collections import Counter
labels = [ann['result'][0]['value']['choices'][0]
for ann in annotations
if ann['result']]
most_common = Counter(labels).most_common(1)
if most_common and most_common[0][1] >= threshold:
return True
return False
def _accept_task(self, task_id: int):
"""Mark task as accepted"""
requests.patch(
f"{self.base_url}/api/tasks/{task_id}",
headers=self.headers,
json={'is_labeled': True}
)
# Example usage
qc = QualityController(api_key=API_KEY)
# Inject 10% gold tasks
qc.inject_gold_tasks(project_id, gold_ratio=0.1)
# Check annotator quality
low_performers = qc.check_annotator_quality(project_id, min_accuracy=0.8)
print(f"Annotators below threshold: {low_performers}")
# Auto-review with consensus
qc.auto_review_consensus(project_id, consensus_threshold=3)19.5 ML Backend Integration
19.5.1 Pre-annotation with Models
Code
from label_studio_ml.api import LabelStudioMLBase, init_app
from label_studio_ml.utils import get_single_tag_keys
import torch
from transformers import pipeline
class NERPredictor(LabelStudioMLBase):
"""
Custom ML backend for NER pre-annotation
"""
def __init__(self, **kwargs):
super(NERPredictor, self).__init__(**kwargs)
# Load pre-trained model
self.model = pipeline(
"ner",
model="dslim/bert-base-NER",
aggregation_strategy="simple"
)
# Map model labels to Label Studio labels
self.label_map = {
'PER': 'PERSON',
'ORG': 'ORG',
'LOC': 'LOC',
'MISC': 'MISC'
}
def predict(self, tasks, **kwargs):
"""
Generate predictions for tasks
Args:
tasks: List of Label Studio tasks
Returns:
List of predictions in Label Studio format
"""
predictions = []
for task in tasks:
text = task['data']['text']
# Run NER model
entities = self.model(text)
# Convert to Label Studio format
results = []
for entity in entities:
# Map label
label = self.label_map.get(
entity['entity_group'],
entity['entity_group']
)
results.append({
'from_name': 'label',
'to_name': 'text',
'type': 'labels',
'value': {
'start': entity['start'],
'end': entity['end'],
'text': entity['word'],
'labels': [label]
},
'score': entity['score']
})
predictions.append({
'result': results,
'score': np.mean([r['score'] for r in results]) if results else 0,
'model_version': 'bert-base-NER'
})
return predictions
def fit(self, completions, workdir=None, **kwargs):
"""
Fine-tune model on completed annotations
Args:
completions: Completed annotations from Label Studio
workdir: Working directory for model storage
"""
# Extract training data
texts = []
labels = []
for completion in completions:
text = completion['data']['text']
annotations = completion['annotations'][0]['result']
# Convert to token-level labels
tokens = text.split()
token_labels = ['O'] * len(tokens)
for ann in annotations:
start = ann['value']['start']
end = ann['value']['end']
label = ann['value']['labels'][0]
# Find tokens in span
# (Simplified - actual implementation needs word tokenization)
token_labels = self._label_tokens(
text, tokens, start, end, label
)
texts.append(tokens)
labels.append(token_labels)
# Fine-tune model (pseudo-code - actual implementation varies)
# self.model.fine_tune(texts, labels, epochs=3)
return {'model_version': 'fine-tuned-v1'}
# Run ML backend server
if __name__ == '__main__':
app = init_app(NERPredictor)
app.run(host='0.0.0.0', port=9090)19.5.2 Active Learning Loop
Code
class ActiveLearningManager:
"""
Manage active learning loop for efficient annotation
"""
def __init__(self, api_key: str, ml_backend_url: str):
self.api_key = api_key
self.ml_backend_url = ml_backend_url
self.base_url = "http://localhost:8080"
self.headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json"
}
def select_uncertain_samples(self, project_id: int,
n_samples: int = 100,
strategy: str = 'entropy') -> List[int]:
"""
Select most uncertain samples for annotation
Args:
project_id: Label Studio project ID
n_samples: Number of samples to select
strategy: 'entropy', 'margin', or 'random'
Returns:
List of task IDs to annotate
"""
# Get all unlabeled tasks
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/tasks",
headers=self.headers,
params={'is_labeled': False}
)
tasks = response.json()
if not tasks:
return []
# Get predictions from ML backend
task_ids = [t['id'] for t in tasks]
predictions_response = requests.post(
f"{self.ml_backend_url}/predict",
json={'tasks': tasks}
)
predictions = predictions_response.json()
# Calculate uncertainty scores
uncertainties = []
for task, pred in zip(tasks, predictions):
if strategy == 'entropy':
# Calculate entropy from prediction scores
scores = [r['score'] for r in pred['result']]
if scores:
# Normalize scores to probabilities
probs = np.array(scores) / np.sum(scores)
entropy = -np.sum(probs * np.log(probs + 1e-10))
uncertainties.append(entropy)
else:
uncertainties.append(0)
elif strategy == 'margin':
# Margin sampling - difference between top 2 predictions
scores = sorted([r['score'] for r in pred['result']], reverse=True)
if len(scores) >= 2:
margin = scores[0] - scores[1]
uncertainties.append(1 - margin) # Lower margin = higher uncertainty
else:
uncertainties.append(0)
elif strategy == 'random':
uncertainties.append(np.random.random())
# Select top-k uncertain samples
uncertain_indices = np.argsort(uncertainties)[-n_samples:]
selected_task_ids = [task_ids[i] for i in uncertain_indices]
return selected_task_ids
def run_active_learning_cycle(self, project_id: int,
n_iterations: int = 5,
samples_per_iter: int = 100):
"""
Run complete active learning cycle
1. Train model on labeled data
2. Select uncertain samples
3. Send for annotation
4. Wait for completion
5. Repeat
"""
for iteration in range(n_iterations):
print(f"\n=== Active Learning Iteration {iteration + 1} ===")
# Step 1: Train model on current labeled data
self._trigger_model_training(project_id)
# Step 2: Select uncertain samples
selected_tasks = self.select_uncertain_samples(
project_id,
n_samples=samples_per_iter,
strategy='entropy'
)
print(f"Selected {len(selected_tasks)} uncertain tasks")
# Step 3: Prioritize these tasks for annotation
self._prioritize_tasks(project_id, selected_tasks)
# Step 4: Wait for annotations (in practice, this would be async)
print("Waiting for annotations...")
self._wait_for_annotations(project_id, selected_tasks)
# Step 5: Evaluate progress
labeled_count = self._get_labeled_count(project_id)
print(f"Total labeled tasks: {labeled_count}")
def _trigger_model_training(self, project_id: int):
"""Trigger model retraining on ML backend"""
# Get completed annotations
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/annotations",
headers=self.headers
)
completions = response.json()
# Send to ML backend for training
train_response = requests.post(
f"{self.ml_backend_url}/train",
json={'completions': completions}
)
return train_response.json()
def _prioritize_tasks(self, project_id: int, task_ids: List[int]):
"""Move tasks to top of queue"""
for task_id in task_ids:
requests.patch(
f"{self.base_url}/api/tasks/{task_id}",
headers=self.headers,
json={'priority': 10} # High priority
)
def _wait_for_annotations(self, project_id: int, task_ids: List[int]):
"""Wait until tasks are annotated (simplified)"""
import time
while True:
# Check if all tasks are labeled
all_labeled = True
for task_id in task_ids:
response = requests.get(
f"{self.base_url}/api/tasks/{task_id}",
headers=self.headers
)
task = response.json()
if not task.get('is_labeled'):
all_labeled = False
break
if all_labeled:
break
time.sleep(60) # Check every minute
def _get_labeled_count(self, project_id: int) -> int:
"""Count labeled tasks"""
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/tasks",
headers=self.headers,
params={'is_labeled': True}
)
return len(response.json())
# Run active learning
al_manager = ActiveLearningManager(
api_key=API_KEY,
ml_backend_url="http://localhost:9090"
)
al_manager.run_active_learning_cycle(
project_id=project_id,
n_iterations=5,
samples_per_iter=100
)19.6 Export & Analysis
19.6.1 Export Annotations
Code
class AnnotationExporter:
"""
Export annotations in various formats
"""
def __init__(self, api_key: str, base_url: str = "http://localhost:8080"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json"
}
def export_to_coco(self, project_id: int, output_path: str):
"""
Export bounding box annotations to COCO format
Args:
project_id: Label Studio project ID
output_path: Path to save COCO JSON file
"""
# Get all annotations
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/export",
headers=self.headers,
params={'exportType': 'JSON'}
)
annotations = response.json()
# Convert to COCO format
coco = {
'images': [],
'annotations': [],
'categories': []
}
# Build category list
categories = set()
for ann in annotations:
for result in ann['annotations'][0]['result']:
if result['type'] == 'rectanglelabels':
for label in result['value']['rectanglelabels']:
categories.add(label)
coco['categories'] = [
{'id': i, 'name': cat}
for i, cat in enumerate(sorted(categories))
]
category_map = {cat['name']: cat['id'] for cat in coco['categories']}
# Build images and annotations
annotation_id = 0
for image_id, ann in enumerate(annotations):
# Image info
coco['images'].append({
'id': image_id,
'file_name': ann['data'].get('image', ''),
'width': ann['data'].get('width', 0),
'height': ann['data'].get('height', 0)
})
# Annotations for this image
for result in ann['annotations'][0]['result']:
if result['type'] == 'rectanglelabels':
bbox_value = result['value']
# Convert percentage to pixels
x = bbox_value['x'] * ann['data']['width'] / 100
y = bbox_value['y'] * ann['data']['height'] / 100
w = bbox_value['width'] * ann['data']['width'] / 100
h = bbox_value['height'] * ann['data']['height'] / 100
for label in bbox_value['rectanglelabels']:
coco['annotations'].append({
'id': annotation_id,
'image_id': image_id,
'category_id': category_map[label],
'bbox': [x, y, w, h],
'area': w * h,
'iscrowd': 0
})
annotation_id += 1
# Save to file
with open(output_path, 'w') as f:
json.dump(coco, f, indent=2)
print(f"Exported {len(coco['images'])} images with {len(coco['annotations'])} annotations")
def export_to_conll(self, project_id: int, output_path: str):
"""
Export NER annotations to CoNLL format
Format:
token1 O
token2 B-PER
token3 I-PER
token4 O
"""
# Get annotations
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/export",
headers=self.headers,
params={'exportType': 'JSON'}
)
annotations = response.json()
with open(output_path, 'w') as f:
for ann in annotations:
text = ann['data']['text']
tokens = text.split() # Simplified tokenization
# Initialize all tokens as O (outside)
labels = ['O'] * len(tokens)
# Get entity spans
for result in ann['annotations'][0]['result']:
if result['type'] == 'labels':
start = result['value']['start']
end = result['value']['end']
entity_type = result['value']['labels'][0]
# Find tokens in span (simplified)
char_pos = 0
for i, token in enumerate(tokens):
token_start = char_pos
token_end = char_pos + len(token)
# Check overlap with entity span
if token_start >= start and token_end <= end:
if token_start == start:
labels[i] = f'B-{entity_type}'
else:
labels[i] = f'I-{entity_type}'
char_pos = token_end + 1 # +1 for space
# Write tokens and labels
for token, label in zip(tokens, labels):
f.write(f"{token} {label}\n")
f.write("\n") # Blank line between documents
print(f"Exported {len(annotations)} documents to CoNLL format")
def export_to_yolo(self, project_id: int, output_dir: str):
"""
Export to YOLO format
Creates:
- images/ directory with images
- labels/ directory with .txt files
- data.yaml with class names
"""
import os
os.makedirs(f"{output_dir}/images", exist_ok=True)
os.makedirs(f"{output_dir}/labels", exist_ok=True)
# Get annotations
response = requests.get(
f"{self.base_url}/api/projects/{project_id}/export",
headers=self.headers,
params={'exportType': 'JSON'}
)
annotations = response.json()
# Get class names
classes = set()
for ann in annotations:
for result in ann['annotations'][0]['result']:
if result['type'] == 'rectanglelabels':
for label in result['value']['rectanglelabels']:
classes.add(label)
classes = sorted(classes)
class_to_id = {cls: i for i, cls in enumerate(classes)}
# Process each image
for ann in annotations:
image_name = os.path.basename(ann['data']['image'])
label_name = os.path.splitext(image_name)[0] + '.txt'
img_width = ann['data']['width']
img_height = ann['data']['height']
# Write label file
with open(f"{output_dir}/labels/{label_name}", 'w') as f:
for result in ann['annotations'][0]['result']:
if result['type'] == 'rectanglelabels':
bbox_value = result['value']
# Convert to YOLO format (normalized center coordinates)
x_center = (bbox_value['x'] + bbox_value['width'] / 2) / 100
y_center = (bbox_value['y'] + bbox_value['height'] / 2) / 100
width = bbox_value['width'] / 100
height = bbox_value['height'] / 100
for label in bbox_value['rectanglelabels']:
class_id = class_to_id[label]
f.write(f"{class_id} {x_center} {y_center} {width} {height}\n")
# Write data.yaml
with open(f"{output_dir}/data.yaml", 'w') as f:
f.write(f"path: {output_dir}\n")
f.write(f"train: images\n")
f.write(f"val: images\n")
f.write(f"nc: {len(classes)}\n")
f.write(f"names: {classes}\n")
print(f"Exported to YOLO format in {output_dir}")
# Example usage
exporter = AnnotationExporter(api_key=API_KEY)
# Export to different formats
exporter.export_to_coco(project_id, 'coco_annotations.json')
exporter.export_to_conll(project_id, 'ner_annotations.conll')
exporter.export_to_yolo(project_id, 'yolo_dataset')20 Workforce Management
20.1 Compensation & Pricing
20.1.1 Fair Wage Calculator
class FairWageCalculator:
"""
Calculate fair compensation for annotation tasks
"""
def __init__(self, min_hourly_wage: float = 15.0):
"""
Args:
min_hourly_wage: Minimum hourly wage in USD
"""
self.min_hourly_wage = min_hourly_wage
self.wage_per_second = min_hourly_wage / 3600
def calculate_piece_rate(self, task_type: str,
pilot_times: List[float],
complexity_multiplier: float = 1.0) -> dict:
"""
Calculate fair piece rate based on pilot timing
Args:
task_type: Type of task (for logging)
pilot_times: List of completion times in seconds from pilot
complexity_multiplier: Adjustment for task difficulty
Returns:
Dict with pricing information
"""
median_time = np.median(pilot_times)
p75_time = np.percentile(pilot_times, 75)
# Use 75th percentile to account for learning curve
# Add 20% buffer for quality work
estimated_time = p75_time * 1.2 * complexity_multiplier
# Calculate base rate
base_rate = estimated_time * self.wage_per_second
# Round up to nearest cent
piece_rate = np.ceil(base_rate * 100) / 100
# Calculate expected hourly rate
tasks_per_hour = 3600 / estimated_time
effective_hourly = piece_rate * tasks_per_hour
return {
'task_type': task_type,
'piece_rate_usd': piece_rate,
'median_time_sec': median_time,
'p75_time_sec': p75_time,
'estimated_time_sec': estimated_time,
'tasks_per_hour': tasks_per_hour,
'effective_hourly_usd': effective_hourly,
'meets_minimum_wage': effective_hourly >= self.min_hourly_wage
}
def calculate_tiered_pricing(self, base_rate: float,
quality_tiers: dict = None) -> pd.DataFrame:
"""
Create tiered pricing based on quality
Args:
base_rate: Base piece rate
quality_tiers: Dict of {tier_name: multiplier}
Returns:
DataFrame with tier pricing
"""
if quality_tiers is None:
quality_tiers = {
'Entry Level (< 80% accuracy)': 0.8,
'Standard (80-90% accuracy)': 1.0,
'Experienced (90-95% accuracy)': 1.2,
'Expert (> 95% accuracy)': 1.5
}
tiers = []
for tier_name, multiplier in quality_tiers.items():
tiers.append({
'tier': tier_name,
'multiplier': multiplier,
'piece_rate': base_rate * multiplier,
'hourly_equivalent': base_rate * multiplier * 100 # Assuming 100 tasks/hr
})
return pd.DataFrame(tiers)
# Example usage with pilot data
calculator = FairWageCalculator(min_hourly_wage=15.0)
# Simulate pilot timing data for different tasks
np.random.seed(42)
task_types = {
'Text Classification': np.random.normal(8, 2, 50), # Mean 8 sec, std 2
'NER (per document)': np.random.normal(45, 10, 50),
'Bounding Box': np.random.normal(25, 8, 50),
'Image Segmentation': np.random.normal(180, 30, 50)
}
pricing_table = []
for task_type, pilot_times in task_types.items():
pricing = calculator.calculate_piece_rate(
task_type,
pilot_times,
complexity_multiplier=1.0
)
pricing_table.append(pricing)
pricing_df = pd.DataFrame(pricing_table)
print("Fair Wage Pricing:")
print(pricing_df)
# Create tiered pricing for NER task
ner_rate = pricing_df[pricing_df['task_type'] == 'NER (per document)']['piece_rate_usd'].values[0]
tiered_pricing = calculator.calculate_tiered_pricing(ner_rate)
print("\nTiered Pricing for NER:")
print(tiered_pricing)20.1.2 Client Pricing Calculator
class ClientPricingCalculator:
"""
Calculate client pricing with markup
"""
def __init__(self, cost_structure: dict = None):
"""
Args:
cost_structure: Dict with cost components as percentages
"""
if cost_structure is None:
self.cost_structure = {
'labor': 0.40,
'qa_review': 0.15,
'platform': 0.10,
'management': 0.12,
'training': 0.05,
'support': 0.03,
'overhead': 0.05,
'profit': 0.10
}
else:
self.cost_structure = cost_structure
def calculate_client_price(self, labor_cost: float,
volume_tier: str = 'standard') -> dict:
"""
Calculate client price from labor cost
Args:
labor_cost: Direct labor cost per task
volume_tier: 'small', 'standard', 'large', 'enterprise'
Returns:
Dict with pricing breakdown
"""
# Volume discounts
volume_discounts = {
'small': 1.0, # < 10K tasks
'standard': 0.85, # 10-100K
'large': 0.75, # 100K-1M
'enterprise': 0.65 # > 1M
}
discount = volume_discounts.get(volume_tier, 1.0)
# Calculate each cost component
costs = {}
costs['labor'] = labor_cost
costs['qa_review'] = labor_cost * (self.cost_structure['qa_review'] /
self.cost_structure['labor'])
costs['platform'] = labor_cost * (self.cost_structure['platform'] /
self.cost_structure['labor'])
costs['management'] = labor_cost * (self.cost_structure['management'] /
self.cost_structure['labor'])
costs['training'] = labor_cost * (self.cost_structure['training'] /
self.cost_structure['labor'])
costs['support'] = labor_cost * (self.cost_structure['support'] /
self.cost_structure['labor'])
costs['overhead'] = labor_cost * (self.cost_structure['overhead'] /
self.cost_structure['labor'])
# Total cost before profit
total_cost = sum(costs.values())
# Add profit margin
profit_margin = total_cost * (self.cost_structure['profit'] /
(1 - self.cost_structure['profit']))
costs['profit'] = profit_margin
# Apply volume discount
base_price = total_cost + profit_margin
final_price = base_price * discount
return {
'labor_cost': labor_cost,
'cost_breakdown': costs,
'base_price': base_price,
'volume_tier': volume_tier,
'volume_discount': discount,
'final_price': final_price,
'markup_percentage': (final_price / labor_cost - 1) * 100
}
def create_pricing_table(self, labor_costs: dict) -> pd.DataFrame:
"""
Create comprehensive pricing table
Args:
labor_costs: Dict of {task_type: labor_cost_per_task}
Returns:
DataFrame with pricing for all volume tiers
"""
rows = []
for task_type, labor_cost in labor_costs.items():
for tier in ['small', 'standard', 'large', 'enterprise']:
pricing = self.calculate_client_price(labor_cost, tier)
rows.append({
'task_type': task_type,
'volume_tier': tier,
'labor_cost': labor_cost,
'total_cost': sum(pricing['cost_breakdown'].values()) - pricing['cost_breakdown']['profit'],
'profit': pricing['cost_breakdown']['profit'],
'base_price': pricing['base_price'],
'discount': f"{(1 - pricing['volume_discount']) * 100:.0f}%",
'final_price': pricing['final_price'],
'markup': f"{pricing['markup_percentage']:.1f}%"
})
return pd.DataFrame(rows)
# Example usage
client_pricer = ClientPricingCalculator()
# Use labor costs from previous example
labor_costs = {
'Text Classification': 0.03,
'NER (per document)': 0.18,
'Bounding Box': 0.10,
'Image Segmentation': 0.72
}
pricing_table = client_pricer.create_pricing_table(labor_costs)
print("Client Pricing Table:")
print(pricing_table)
# Example: Detailed breakdown for NER task at standard volume
ner_pricing = client_pricer.calculate_client_price(
labor_cost=0.18,
volume_tier='standard'
)
print("\nDetailed Cost Breakdown (NER, Standard Volume):")
for component, cost in ner_pricing['cost_breakdown'].items():
percentage = (cost / ner_pricing['final_price']) * 100
print(f" {component.capitalize()}: ${cost:.3f} ({percentage:.1f}%)")
print(f"\nFinal Client Price: ${ner_pricing['final_price']:.3f}")
print(f"Markup over Labor: {ner_pricing['markup_percentage']:.1f}%")20.2 Worker Performance Tracking
20.2.1 Performance Dashboard
class WorkerPerformanceTracker:
"""
Track and analyze worker performance metrics
"""
def __init__(self):
self.metrics = pd.DataFrame()
def log_annotation(self, worker_id: str, task_id: str,
task_type: str, time_seconds: float,
quality_score: float = None, is_gold: bool = False,
gold_correct: bool = None):
"""
Log single annotation event
Args:
worker_id: Worker identifier
task_id: Task identifier
task_type: Type of annotation task
time_seconds: Time taken to complete
quality_score: Quality score if available (0-1)
is_gold: Whether this was a gold standard task
gold_correct: Whether worker was correct on gold task
"""
new_row = {
'worker_id': worker_id,
'task_id': task_id,
'task_type': task_type,
'time_seconds': time_seconds,
'quality_score': quality_score,
'is_gold': is_gold,
'gold_correct': gold_correct,
'timestamp': pd.Timestamp.now()
}
self.metrics = pd.concat([
self.metrics,
pd.DataFrame([new_row])
], ignore_index=True)
def calculate_worker_stats(self, lookback_days: int = 30) -> pd.DataFrame:
"""
Calculate statistics per worker
Args:
lookback_days: Number of days to include in stats
Returns:
DataFrame with worker statistics
"""
cutoff_date = pd.Timestamp.now() - pd.Timedelta(days=lookback_days)
recent_data = self.metrics[self.metrics['timestamp'] >= cutoff_date]
stats = []
for worker_id in recent_data['worker_id'].unique():
worker_data = recent_data[recent_data['worker_id'] == worker_id]
# Calculate metrics
total_tasks = len(worker_data)
total_time = worker_data['time_seconds'].sum()
avg_time = worker_data['time_seconds'].mean()
median_time = worker_data['time_seconds'].median()
# Gold task performance
gold_data = worker_data[worker_data['is_gold'] == True]
if len(gold_data) > 0:
accuracy = gold_data['gold_correct'].mean()
n_gold = len(gold_data)
else:
accuracy = None
n_gold = 0
# Tasks per day
days_active = (worker_data['timestamp'].max() -
worker_data['timestamp'].min()).days + 1
tasks_per_day = total_tasks / days_active if days_active > 0 else 0
# Calculate consistency (coefficient of variation in time)
cv_time = (worker_data['time_seconds'].std() / avg_time
if avg_time > 0 else 0)
stats.append({
'worker_id': worker_id,
'total_tasks': total_tasks,
'total_hours': total_time / 3600,
'avg_time_sec': avg_time,
'median_time_sec': median_time,
'cv_time': cv_time,
'tasks_per_day': tasks_per_day,
'gold_tasks': n_gold,
'accuracy': accuracy,
'days_active': days_active
})
return pd.DataFrame(stats)
def identify_issues(self, worker_stats: pd.DataFrame) -> dict:
"""
Identify potential quality or fraud issues
Args:
worker_stats: DataFrame from calculate_worker_stats()
Returns:
Dict with flagged workers and reasons
"""
issues = {
'too_fast': [],
'too_slow': [],
'low_accuracy': [],
'suspicious_consistency': []
}
# Calculate thresholds from population
median_time = worker_stats['median_time_sec'].median()
q1_time = worker_stats['median_time_sec'].quantile(0.25)
q3_time = worker_stats['median_time_sec'].quantile(0.75)
for _, worker in worker_stats.iterrows():
worker_id = worker['worker_id']
# Too fast (< 50% of population median)
if worker['median_time_sec'] < median_time * 0.5:
issues['too_fast'].append({
'worker_id': worker_id,
'median_time': worker['median_time_sec'],
'population_median': median_time
})
# Too slow (> 2x population median)
if worker['median_time_sec'] > median_time * 2:
issues['too_slow'].append({
'worker_id': worker_id,
'median_time': worker['median_time_sec'],
'population_median': median_time
})
# Low accuracy on gold tasks
if worker['accuracy'] is not None and worker['accuracy'] < 0.7:
issues['low_accuracy'].append({
'worker_id': worker_id,
'accuracy': worker['accuracy'],
'n_gold': worker['gold_tasks']
})
# Suspiciously consistent timing (CV < 0.1)
# Might indicate automation
if worker['cv_time'] < 0.1 and worker['total_tasks'] > 20:
issues['suspicious_consistency'].append({
'worker_id': worker_id,
'cv_time': worker['cv_time'],
'total_tasks': worker['total_tasks']
})
return issues
def plot_worker_performance(self, worker_id: str):
"""
Create performance visualization for single worker
"""
import matplotlib.pyplot as plt
import seaborn as sns
worker_data = self.metrics[self.metrics['worker_id'] == worker_id]
if len(worker_data) == 0:
print(f"No data for worker {worker_id}")
return
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. Time series of completion time
axes[0, 0].plot(worker_data['timestamp'], worker_data['time_seconds'],
marker='o', alpha=0.6)
axes[0, 0].set_title('Task Completion Time Over Time')
axes[0, 0].set_xlabel('Date')
axes[0, 0].set_ylabel('Time (seconds)')
axes[0, 0].tick_params(axis='x', rotation=45)
# 2. Distribution of completion times
axes[0, 1].hist(worker_data['time_seconds'], bins=30, edgecolor='black', alpha=0.7)
axes[0, 1].axvline(worker_data['time_seconds'].median(),
color='red', linestyle='--', label='Median')
axes[0, 1].set_title('Distribution of Completion Times')
axes[0, 1].set_xlabel('Time (seconds)')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].legend()
# 3. Accuracy on gold tasks over time
gold_data = worker_data[worker_data['is_gold'] == True].copy()
if len(gold_data) > 0:
gold_data['cumulative_accuracy'] = gold_data['gold_correct'].expanding().mean()
axes[1, 0].plot(gold_data['timestamp'], gold_data['cumulative_accuracy'],
marker='o', color='green')
axes[1, 0].axhline(0.8, color='red', linestyle='--', label='80% Threshold')
axes[1, 0].set_title('Cumulative Accuracy on Gold Tasks')
axes[1, 0].set_xlabel('Date')
axes[1, 0].set_ylabel('Cumulative Accuracy')
axes[1, 0].set_ylim([0, 1])
axes[1, 0].legend()
axes[1, 0].tick_params(axis='x', rotation=45)
else:
axes[1, 0].text(0.5, 0.5, 'No Gold Task Data',
ha='center', va='center', fontsize=14)
axes[1, 0].set_title('Gold Task Performance')
# 4. Tasks per day
worker_data['date'] = worker_data['timestamp'].dt.date
tasks_per_day = worker_data.groupby('date').size()
axes[1, 1].bar(range(len(tasks_per_day)), tasks_per_day.values, alpha=0.7)
axes[1, 1].set_title('Tasks Completed Per Day')
axes[1, 1].set_xlabel('Day')
axes[1, 1].set_ylabel('Number of Tasks')
plt.suptitle(f'Performance Dashboard: Worker {worker_id}', fontsize=16)
plt.tight_layout()
plt.savefig(f'worker_{worker_id}_performance.png', dpi=150, bbox_inches='tight')
plt.close()
# Example usage with synthetic data
tracker = WorkerPerformanceTracker()
# Simulate annotation logs
np.random.seed(42)
n_workers = 10
n_tasks_per_worker = 200
for worker_id in range(n_workers):
# Worker characteristics
base_speed = np.random.uniform(20, 60) # Seconds per task
accuracy = np.random.uniform(0.6, 0.95)
for task_num in range(n_tasks_per_worker):
# Generate task completion
time_taken = np.random.normal(base_speed, base_speed * 0.2)
time_taken = max(5, time_taken) # Minimum 5 seconds
# 10% are gold tasks
is_gold = np.random.random() < 0.1
gold_correct = np.random.random() < accuracy if is_gold else None
# Log annotation
timestamp_offset = pd.Timedelta(hours=task_num)
tracker.metrics = pd.concat([
tracker.metrics,
pd.DataFrame([{
'worker_id': f'W{worker_id:03d}',
'task_id': f'T{task_num:05d}',
'task_type': 'NER',
'time_seconds': time_taken,
'quality_score': None,
'is_gold': is_gold,
'gold_correct': gold_correct,
'timestamp': pd.Timestamp.now() - pd.Timedelta(days=30) + timestamp_offset
}])
], ignore_index=True)
# Calculate worker statistics
worker_stats = tracker.calculate_worker_stats(lookback_days=30)
print("Worker Performance Statistics:")
print(worker_stats.sort_values('accuracy', ascending=False))
# Identify issues
issues = tracker.identify_issues(worker_stats)
print("\nIdentified Issues:")
for issue_type, workers in issues.items():
if workers:
print(f"\n{issue_type.upper()}:")
for worker_issue in workers:
print(f" {worker_issue}")
# Plot performance for top and bottom worker
top_worker = worker_stats.nlargest(1, 'accuracy')['worker_id'].values[0]
tracker.plot_worker_performance(top_worker)20.3 Payment Processing
20.3.1 Payment Calculator & Tracker
Code
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
import numpy as np
class PaymentProcessor:
"""
Manage worker payments and invoicing
"""
def __init__(self, payment_cycle: str = 'weekly'):
"""
Args:
payment_cycle: 'weekly', 'biweekly', or 'monthly'
"""
self.payment_cycle = payment_cycle
self.payment_ledger = pd.DataFrame()
def calculate_worker_payment(self, worker_id: str,
annotations: pd.DataFrame,
piece_rates: Dict[str, float],
bonus_structure: Dict = None) -> dict:
"""
Calculate payment for worker
Args:
worker_id: Worker ID
annotations: DataFrame with annotation logs
piece_rates: Dict of {task_type: rate_per_task}
bonus_structure: Optional bonus configuration
Returns:
Payment details dict
"""
worker_annotations = annotations[annotations['worker_id'] == worker_id]
if len(worker_annotations) == 0:
return {
'worker_id': worker_id,
'total_tasks': 0,
'base_payment': 0,
'bonuses': 0,
'total_payment': 0,
'effective_hourly': 0,
'tasks_by_type': {}
}
# Calculate base payment
payment_details = {'tasks_by_type': {}}
total_base = 0
for task_type, group in worker_annotations.groupby('task_type'):
n_tasks = len(group)
rate = piece_rates.get(task_type, 0)
amount = n_tasks * rate
payment_details['tasks_by_type'][task_type] = {
'count': n_tasks,
'rate': rate,
'amount': amount
}
total_base += amount
payment_details['base_payment'] = total_base
# Calculate bonuses
bonuses = 0
if bonus_structure:
# Quality bonus
if 'quality_bonus' in bonus_structure:
gold_tasks = worker_annotations[worker_annotations['is_gold'] == True]
if len(gold_tasks) > 0:
accuracy = gold_tasks['gold_correct'].mean()
if accuracy >= bonus_structure['quality_bonus']['threshold']:
bonus_amount = total_base * bonus_structure['quality_bonus']['percentage']
bonuses += bonus_amount
payment_details['quality_bonus'] = {
'accuracy': accuracy,
'amount': bonus_amount
}
# Volume bonus
if 'volume_bonus' in bonus_structure:
total_tasks = len(worker_annotations)
if total_tasks >= bonus_structure['volume_bonus']['threshold']:
bonuses += bonus_structure['volume_bonus']['amount']
payment_details['volume_bonus'] = bonus_structure['volume_bonus']['amount']
payment_details['worker_id'] = worker_id
payment_details['total_tasks'] = len(worker_annotations)
payment_details['bonuses'] = bonuses
payment_details['total_payment'] = total_base + bonuses
payment_details['effective_hourly'] = (
(total_base + bonuses) / (worker_annotations['time_seconds'].sum() / 3600)
if worker_annotations['time_seconds'].sum() > 0 else 0
)
return payment_details
def generate_payroll(self, annotations: pd.DataFrame,
piece_rates: Dict[str, float],
period_start: datetime = None,
period_end: datetime = None) -> pd.DataFrame:
"""
Generate payroll for all workers
Args:
annotations: All annotation logs
piece_rates: Piece rates by task type
period_start: Start of payment period
period_end: End of payment period
Returns:
DataFrame with payroll details
"""
if period_start is None or period_end is None:
# Use last payment cycle
period_end = datetime.now()
if self.payment_cycle == 'weekly':
period_start = period_end - timedelta(days=7)
elif self.payment_cycle == 'biweekly':
period_start = period_end - timedelta(days=14)
else: # monthly
period_start = period_end - timedelta(days=30)
# Filter annotations to period
period_annotations = annotations[
(annotations['timestamp'] >= period_start) &
(annotations['timestamp'] <= period_end)
]
# Bonus structure
bonus_structure = {
'quality_bonus': {
'threshold': 0.95,
'percentage': 0.20
},
'volume_bonus': {
'threshold': 1000,
'amount': 50.0
}
}
# Calculate payments for all workers
payroll_list = []
for worker_id in period_annotations['worker_id'].unique():
payment = self.calculate_worker_payment(
worker_id,
period_annotations,
piece_rates,
bonus_structure
)
# Flatten the payment dict for DataFrame
payroll_row = {
'worker_id': payment['worker_id'],
'total_tasks': payment['total_tasks'],
'base_payment': payment['base_payment'],
'bonuses': payment['bonuses'],
'total_payment': payment['total_payment'],
'effective_hourly': payment['effective_hourly'],
'period_start': period_start,
'period_end': period_end,
'payment_cycle': self.payment_cycle,
'payment_status': 'pending'
}
# Store full details separately for invoice generation
payroll_row['_full_details'] = payment
payroll_list.append(payroll_row)
if not payroll_list:
# Return empty DataFrame with correct columns
return pd.DataFrame(columns=[
'worker_id', 'total_tasks', 'base_payment', 'bonuses',
'total_payment', 'effective_hourly', 'period_start',
'period_end', 'payment_cycle', 'payment_status'
])
payroll_df = pd.DataFrame(payroll_list)
return payroll_df
def generate_invoice(self, worker_id: str, payment_details: dict) -> str:
"""
Generate invoice for worker
Args:
worker_id: Worker ID
payment_details: Payment calculation dict
Returns:
Invoice as formatted string
"""
invoice = f"""
╔════════════════════════════════════════════════════════════════╗
║ ANNOTATION SERVICES INVOICE ║
╚════════════════════════════════════════════════════════════════╝
Worker ID: {worker_id}
Payment Period: {payment_details.get('period_start', 'N/A')} to {payment_details.get('period_end', 'N/A')}
Invoice Date: {datetime.now().strftime('%Y-%m-%d')}
────────────────────────────────────────────────────────────────
WORK COMPLETED:
"""
for task_type, details in payment_details.get('tasks_by_type', {}).items():
invoice += f"""
{task_type}:
Tasks Completed: {details['count']}
Rate per Task: ${details['rate']:.3f}
Subtotal: ${details['amount']:.2f}
"""
invoice += f"""
────────────────────────────────────────────────────────────────
BASE PAYMENT: ${payment_details['base_payment']:.2f}
"""
if payment_details.get('quality_bonus'):
invoice += f"""
QUALITY BONUS (Accuracy: {payment_details['quality_bonus']['accuracy']:.1%}): ${payment_details['quality_bonus']['amount']:.2f}
"""
if payment_details.get('volume_bonus'):
invoice += f"""
VOLUME BONUS: ${payment_details['volume_bonus']:.2f}
"""
invoice += f"""
────────────────────────────────────────────────────────────────
TOTAL PAYMENT: ${payment_details['total_payment']:.2f}
Effective Hourly Rate: ${payment_details['effective_hourly']:.2f}/hour
────────────────────────────────────────────────────────────────
Payment will be processed within 5 business days.
Questions? Contact: finance@annotationservices.com
Thank you for your valuable contributions!
"""
return invoice
# Example usage
# First, create some sample annotation data
np.random.seed(42)
# Create sample tracker metrics
sample_data = []
for worker_id in ['W001', 'W002', 'W003']:
for i in range(150):
sample_data.append({
'worker_id': worker_id,
'task_id': f'T{i:05d}',
'task_type': np.random.choice(['NER', 'Text Classification', 'Bounding Box']),
'time_seconds': np.random.normal(45, 10),
'quality_score': None,
'is_gold': np.random.random() < 0.1,
'gold_correct': np.random.random() < 0.96,
'timestamp': datetime.now() - timedelta(days=np.random.randint(0, 7))
})
annotations_df = pd.DataFrame(sample_data)
# Create processor
processor = PaymentProcessor(payment_cycle='weekly')
# Define piece rates
piece_rates = {
'NER': 0.18,
'Text Classification': 0.03,
'Bounding Box': 0.10
}
# Generate payroll
payroll = processor.generate_payroll(
annotations_df,
piece_rates,
period_start=datetime.now() - timedelta(days=7),
period_end=datetime.now()
)
print("Payroll Summary:")
print(payroll[['worker_id', 'total_tasks', 'base_payment', 'bonuses', 'total_payment', 'effective_hourly']])
# Generate invoice for top earner
if len(payroll) > 0:
top_earner = payroll.nlargest(1, 'total_payment').iloc[0]
# Get full payment details from the stored details
payment_details = top_earner['_full_details']
payment_details['period_start'] = top_earner['period_start'].strftime('%Y-%m-%d')
payment_details['period_end'] = top_earner['period_end'].strftime('%Y-%m-%d')
invoice = processor.generate_invoice(top_earner['worker_id'], payment_details)
print("\n" + invoice)
else:
print("No payroll data for the period")21 Complete System Architecture
21.1 System Overview
graph TB
subgraph "Client Interface"
ClientAPI[Client API]
ClientDash[Client Dashboard]
end
subgraph "Core Platform"
TaskMgmt[Task Management]
Assignment[Task Assignment Engine]
Workflow[Workflow Orchestrator]
Quality[Quality Control]
end
subgraph "Annotation Interface"
WebUI[Web Annotation UI]
MobileUI[Mobile UI]
API[Annotation API]
end
subgraph "ML Backend"
PreAnnotation[Pre-annotation Models]
ActiveLearning[Active Learning]
Aggregation[Label Aggregation]
end
subgraph "Worker Management"
WorkerDB[(Worker Database)]
Performance[Performance Tracking]
Payment[Payment System]
end
subgraph "Data Storage"
RawData[(Raw Data Storage)]
Annotations[(Annotation Database)]
Backups[(Backup Storage)]
end
subgraph "External Services"
S3[AWS S3]
Stripe[Stripe Payment]
Email[Email Service]
end
ClientAPI --> TaskMgmt
ClientDash --> TaskMgmt
TaskMgmt --> Assignment
Assignment --> Workflow
Workflow --> WebUI
Workflow --> MobileUI
Workflow --> API
WebUI --> Annotations
MobileUI --> Annotations
API --> Annotations
Annotations --> Quality
Quality --> Aggregation
Aggregation --> PreAnnotation
PreAnnotation --> ActiveLearning
Assignment --> WorkerDB
Performance --> WorkerDB
Payment --> WorkerDB
Payment --> Stripe
TaskMgmt --> RawData
Annotations --> RawData
RawData --> Backups
Annotations --> Backups
RawData --> S3
Backups --> S3
Workflow --> Email
21.2 Technology Stack
Code
import pandas as pd
tech_stack = pd.DataFrame({
'Component': [
'Web Framework',
'API Framework',
'Database (Primary)',
'Database (Cache)',
'Task Queue',
'File Storage',
'ML Framework',
'Frontend',
'Authentication',
'Monitoring',
'Analytics',
'Payment Processing',
'Email Service',
'Deployment',
'Load Balancer'
],
'Technology': [
'Django / FastAPI',
'FastAPI',
'PostgreSQL',
'Redis',
'Celery + RabbitMQ',
'AWS S3 / MinIO',
'PyTorch / TensorFlow',
'React + TypeScript',
'Auth0 / Django Auth',
'Prometheus + Grafana',
'Mixpanel / Amplitude',
'Stripe',
'SendGrid / Mailgun',
'Docker + Kubernetes',
'Nginx'
],
'Purpose': [
'Backend application framework',
'RESTful API endpoints',
'Relational data storage',
'Session cache, task queue',
'Async task processing',
'Media file storage',
'Model training/inference',
'Annotation interface',
'User authentication',
'System health monitoring',
'User behavior analytics',
'Worker payments',
'Automated emails',
'Container orchestration',
'Traffic distribution'
],
'Alternatives': [
'Flask, Node.js',
'Django REST Framework',
'MySQL, MongoDB',
'Memcached',
'Apache Airflow, AWS SQS',
'Google Cloud Storage, Azure',
'JAX, Scikit-learn',
'Vue.js, Angular',
'Keycloak, Firebase Auth',
'Datadog, New Relic',
'Google Analytics',
'PayPal, Square',
'Amazon SES',
'AWS ECS, Google GKE',
'HAProxy, AWS ALB'
]
})
tech_stack