# Office File Locator (OFL)
A production-grade, government-ready Physical File Tracking System built by Team Aatmanova. OFL digitally mirrors every physical file's journey through government departments — from creation through multiple transfers to final archival — enforcing accountability at every step.
System Architecture
Tech Stack
| Layer | Technology |
|---|---|
| Backend | FastAPI 0.115 + Uvicorn |
| ORM | SQLAlchemy 2.0 (Async) + asyncpg |
| Database | PostgreSQL 16 |
| Migrations | Alembic 1.13 |
| Auth | python-jose (JWT) + passlib (bcrypt) |
| Task Queue | Celery 5.4 + Celery Beat |
| Message Broker | Redis 7 |
| Object Storage | MinIO (S3-compatible) |
| Frontend | React 18 + TypeScript + Vite |
| UI Components | shadcn/ui + Radix UI |
| State Management | Zustand |
| Data Fetching | TanStack Query |
| Container | Docker Compose v2 |
| Testing | Pytest + pytest-asyncio |
Core Features
File Lifecycle Management
OFL implements a comprehensive state machine for physical file tracking:
- Created: File enters system with auto-generated number (FILE-2026-00001)
- In-Transit: File assigned but not yet physically received
- Assigned: Current holder confirmed physically holding the file
- Unassigned: File in pool, available for re-assignment
- Completed: Archived, read-only state
Optimistic Locking
Every file update uses optimistic locking to prevent race conditions:
class File(Base):
__tablename__ = "files"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
file_number: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(200), nullable=False)
status: Mapped[FileStatus] = mapped_column(SQLEnum(FileStatus), default=FileStatus.UNASSIGNED)
opt_version: Mapped[int] = mapped_column(Integer, default=0)async def update_file(db: AsyncSession, user_id: UUID, file_id: UUID, data: FileUpdate) -> File:
file = await FileService.get_file_by_id(db, file_id)
if file.opt_version != data.opt_version:
raise FileLocked() # Concurrent modification detected
file.opt_version += 1
await db.commit()
return fileTransfer Workflow
Every file transfer requires mandatory remarks and maintains complete audit trail:
class FileTransfer(Base):
__tablename__ = "file_transfers"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
file_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("files.id"), nullable=False)
from_user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
to_user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
remarks: Mapped[str] = mapped_column(String(500), nullable=False)
transfer_date: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
receipt_confirmed: Mapped[bool] = mapped_column(Boolean, default=False)
receipt_confirmed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)Physical Receipt Confirmation
Two-step verification prevents ghost assignments:
- Assign → File moves to IN_TRANSIT status
- Confirm Receipt → Holder explicitly confirms holding the physical file → File becomes ASSIGNED
This prevents situations where a file shows as "assigned" but the physical copy never reached the intended holder.
Cross-Department File Requests
Users can request any visible file from its current holder:
class FileRequest(Base):
__tablename__ = "file_requests"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
file_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("files.id"), nullable=False)
requester_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
status: Mapped[RequestStatus] = mapped_column(SQLEnum(RequestStatus), default=RequestStatus.PENDING)
reason: Mapped[str] = mapped_column(String(500), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())Request workflow: PENDING → APPROVED (file transferred) / REJECTED (requester notified)
Return to Pool
Users can release files without admin intervention — file returns to unassigned pool:
async def return_file_to_pool(db: AsyncSession, user_id: UUID, file_id: UUID) -> File:
file = await FileService.get_file_by_id(db, file_id)
if file.current_holder_id != user_id:
raise PermissionError("Only current holder can return file")
file.status = FileStatus.UNASSIGNED
file.current_holder_id = None
file.opt_version += 1
await AuditLogger.log_action(db, actor_id=user_id, action="FILE_RETURN_TO_POOL")
await db.commit()
return fileAccess Control
Dynamic RBAC System
OFL implements a fully dynamic role-based access control system — roles and permissions are database-driven, not hard-coded.
Default Roles
| Role | Description |
|---|---|
| Super Admin | Full system control; cannot be deleted |
| Department Head | Manages department files; receives level-2 SLA escalations |
| Section Officer | Assign, transfer, approve/reject requests |
| Clerk | Create files, confirm receipt, request files, return to pool |
| Viewer | Read-only access to permitted files |
Granular Permissions (20+)
| Permission | Description |
|---|---|
create_file | Create new file records |
assign_file | Assign unassigned files to users |
transfer_own_file | Transfer a file you currently hold |
return_own_file | Return a held file to the unassigned pool |
request_file | Request any visible file from its current holder |
approve_request | Approve or reject incoming file requests |
confirm_receipt | Confirm physical receipt of a file in transit |
upload_attachment | Upload attachments to visible files |
download_attachment | Download attachments from permitted files |
complete_file | Mark a file as completed/archived |
reopen_file | Reopen a completed file |
manage_users | Create, edit, and disable user accounts |
manage_roles | Create, edit, delete roles and their permissions |
manage_departments | Create, edit, delete departments |
view_all_files | Bypass sensitivity restrictions |
Sensitivity-Based Visibility
Files have sensitivity levels that determine visibility:
class FileSensitivity(str, Enum):
PUBLIC = "public" # Visible to all users
DEPARTMENT = "department" # Visible to same department only
CONFIDENTIAL = "confidential" # Strict access controlJWT Authentication
HTTP-only cookie-based JWT with access + refresh tokens:
class Settings(BaseSettings):
SECRET_KEY: str = Field(default="yoursecretkeyhere")
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
HTTPS_ONLY: bool = False # Enforce HTTPS-only cookies in productionasync def login(db: AsyncSession, credentials: LoginCredentials) -> TokenPair:
user = await authenticate_user(credentials.email, credentials.password)
access_token = create_access_token(
data={"sub": str(user.id), "permissions": user.permission_list}
)
refresh_token = create_refresh_token(
data={"sub": str(user.id)}
)
response = TokenPair(access_token=access_token, refresh_token=refresh_token)
set_cookie(response, "access_token", access_token, ACCESS_TOKEN_EXPIRE_MINUTES * 60)
return responseAutomation & Notifications
SLA Escalation Engine
Celery Beat runs daily cron checks for overdue files:
@celery_app.task(name="check_file_sla")
def check_file_sla():
# Level 1: Notify holder (overdue > 0 days)
# Level 2: Notify Department Head (overdue > 3 days)
# Level 3: Notify Admin via email (overdue > 7 days)
overdue_files = db.query(File).filter(
File.status == FileStatus.ASSIGNED,
File.due_date < today
).all()
for file in overdue_files:
days_overdue = (today - file.due_date).days
if days_overdue > 7:
send_escalation_email(admin, file, level=3)
elif days_overdue > 3:
notify_department_head(file, level=2)
else:
notify_holder(file, level=1)Real-time Notifications
WebSocket push notifications for instant updates:
@router.websocket("/ws/notifications")
async def websocket_notifications(websocket: WebSocket, token: str = WebSocketAuth()):
await websocket.accept()
user_id = decode_token(token)
subscription = subscribe_user(user_id)
try:
while True:
notification = await subscription.get()
await websocket.send_json(notification)
finally:
unsubscribe_user(user_id)Frontend implementation:
const useNotifications = () => {
const { data } = useQuery({
queryKey: ['notifications'],
queryFn: () => api.get('/api/v1/notifications'),
refetchInterval: 30000,
});
useEffect(() => {
const ws = new WebSocket(`ws://${window.location.host}/api/v1/ws/notifications`);
ws.onmessage = (event) => {
const notification = JSON.parse(event.data);
toast.success(notification.message);
};
return () => ws.close();
}, []);
return data;
};Email Alerts
SMTP integration for:
- New file assignments
- Transfer requests
- Overdue file notifications
- SLA escalation alerts
File Attachments
MinIO Object Storage
Secure attachment storage with sensitivity-aware access:
class FileAttachment(Base):
__tablename__ = "file_attachments"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
file_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("files.id"), nullable=False)
original_filename: Mapped[str] = mapped_column(String(255), nullable=False)
storage_key: Mapped[str] = mapped_column(String(500), nullable=False)
mime_type: Mapped[str] = mapped_column(String(100), nullable=False)
size_bytes: Mapped[int] = mapped_column(Integer, nullable=False)
uploaded_by_user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
uploaded_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())async def upload_attachment(
db: AsyncSession,
file: UploadFile,
file_id: UUID,
user_id: UUID
) -> FileAttachment:
# Validate file type and size (max 20 MB)
if file.size > 20 * 1024 * 1024:
raise FileTooLarge()
allowed_types = ["application/pdf", "application/msword",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"image/png", "image/jpeg"]
if file.content_type not in allowed_types:
raise InvalidFileType()
# Upload to MinIO
storage_key = f"attachments/{file_id}/{uuid.uuid4()}-{file.filename}"
await minio_client.put_object(
bucket=settings.MINIO_BUCKET,
object_name=storage_key,
data=file.file,
length=file.size,
content_type=file.content_type
)
# Create database record
attachment = FileAttachment(
file_id=file_id,
original_filename=file.filename,
storage_key=storage_key,
mime_type=file.content_type,
size_bytes=file.size,
uploaded_by_user_id=user_id
)
await db.commit()
return attachmentSupported Formats
- PDF, DOCX, XLSX (max 20 MB each)
- PNG, JPEG images (max 20 MB each)
API Endpoints
| Tag | Prefix | Description |
|---|---|---|
| Authentication | /api/v1/auth | Login, logout, token refresh |
| Files | /api/v1/files | CRUD, search, status management |
| Transfers | /api/v1/transfers | Assign, transfer, return, confirm receipt |
| Requests | /api/v1/requests | File request workflow |
| Notifications | /api/v1/notifications | Fetch & mark-read notifications |
| Attachments | /api/v1/attachments | Upload, download, delete attachments |
| Analytics | /api/v1/analytics | Dashboard metrics |
| Reports | /api/v1/reports | Distribution, movement, overdue reports |
| Administration | /api/v1/admin | Users, roles, departments, categories |
| WebSocket | /api/v1/ws | Real-time notification stream |
Sample Endpoints
@router.post("/files", tags=["Files"])
async def create_file(
file_data: FileCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Create a new file record"""
file = await FileService.create_file(db, current_user.id, file_data)
return file
@router.post("/transfers", tags=["Transfers"])
async def transfer_file(
transfer_data: TransferCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Transfer a file to another user"""
transfer = await TransferService.create_transfer(
db, current_user.id, transfer_data
)
return transfer
@router.post("/transfers/{transfer_id}/confirm", tags=["Transfers"])
async def confirm_receipt(
transfer_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Confirm physical receipt of file in transit"""
file = await TransferService.confirm_receipt(db, current_user.id, transfer_id)
return fileDatabase Schema
Core Models
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(100), nullable=False)
department_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("departments.id"), nullable=False)
role_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("roles.id"), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
class Department(Base):
__tablename__ = "departments"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
code: Mapped[str] = mapped_column(String(20), unique=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
class Role(Base):
__tablename__ = "roles"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
permissions: Mapped[list[str]] = mapped_column(JSON, default=list)
is_system: Mapped[bool] = mapped_column(Boolean, default=False)
class FileCategory(Base):
__tablename__ = "file_categories"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(100), nullable=False)
code: Mapped[str] = mapped_column(String(20), nullable=False)
default_sla_days: Mapped[int] = mapped_column(Integer, default=30)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
class AuditLog(Base):
__tablename__ = "audit_logs"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
actor_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
action: Mapped[str] = mapped_column(String(50), nullable=False)
target_type: Mapped[str] = mapped_column(String(50), nullable=False)
target_id: Mapped[uuid.UUID] = mapped_column(nullable=False)
metadata: Mapped[dict] = mapped_column(JSON, default=dict)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())Database Indexes
# File indexes for query performance
__table_args__ = (
Index('ix_files_status', 'status'),
Index('ix_files_category_id', 'category_id'),
Index('ix_files_current_holder_id', 'current_holder_id'),
Index('ix_files_created_date', 'created_date'),
Index('ix_files_file_number', 'file_number'),
Index('ix_files_search', 'file_number', 'name'), # Composite for text search
)Admin & Reporting
User Management
Full CRUD with audit logging:
@router.post("/users", tags=["Administration"])
async def create_user(
user_data: UserCreate,
current_user: User = Depends(require_permission("manage_users")),
db: AsyncSession = Depends(get_db)
):
"""Create a new user with role assignment"""
user = await AdminService.create_user(db, user_data)
await AuditLogger.log_action(
db,
actor_id=current_user.id,
action="USER_CREATE",
target_type="USER",
target_id=user.id
)
return user
@router.delete("/users/{user_id}", tags=["Administration"])
async def deactivate_user(
user_id: UUID,
current_user: User = Depends(require_permission("manage_users")),
db: AsyncSession = Depends(get_db)
):
"""Soft-delete (deactivate) a user account"""
await AdminService.deactivate_user(db, user_id)
return {"status": "deactivated"}Analytics Dashboard
Real-time metrics:
interface DashboardMetrics {
totalFiles: number;
activeFiles: number;
completedFiles: number;
overdueFiles: number;
filesByDepartment: DepartmentStats[];
fileMovementTrend: TrendData[];
topHolders: UserStats[];
}@router.get("/analytics/dashboard", tags=["Analytics"])
async def get_dashboard_metrics(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
) -> DashboardMetrics:
"""Get aggregate metrics for dashboard display"""
total = await db.execute(select(func.count(File.id)))
active = await db.execute(
select(func.count(File.id)).where(File.status == FileStatus.ASSIGNED)
)
overdue = await db.execute(
select(func.count(File.id)).where(
File.status == FileStatus.ASSIGNED,
File.due_date < date.today()
)
)
return DashboardMetrics(
totalFiles=total.scalar(),
activeFiles=active.scalar(),
overdueFiles=overdue.scalar()
)Reports Module
@router.get("/reports/distribution", tags=["Reports"])
async def get_department_distribution(
current_user: User = Depends(require_permission("view_reports")),
db: AsyncSession = Depends(get_db)
):
"""Files distributed across departments"""
query = (
select(
Department.name,
func.count(File.id).label("file_count")
)
.join(User, File.current_holder_id == User.id)
.join(Department, User.department_id == Department.id)
.group_by(Department.id)
)
result = await db.execute(query)
return [{"department": row.name, "count": row.file_count} for row in result]
@router.get("/reports/movement", tags=["Reports"])
async def get_movement_log(
start_date: date,
end_date: date,
current_user: User = Depends(require_permission("view_reports")),
db: AsyncSession = Depends(get_db)
):
"""File movement history for date range"""
query = (
select(FileTransfer)
.where(
FileTransfer.transfer_date >= start_date,
FileTransfer.transfer_date <= end_date
)
.order_by(FileTransfer.transfer_date.desc())
)
result = await db.execute(query)
return result.scalars().all()Project Structure
ofl/
├── backend/ # FastAPI application
│ ├── app/
│ │ ├── api/v1/ # Route handlers
│ │ │ ├── auth.py # Login, logout, refresh
│ │ │ ├── files.py # File CRUD
│ │ │ ├── transfers.py # Transfer workflow
│ │ │ ├── requests.py # Cross-dept requests
│ │ │ ├── attachments.py # MinIO upload/download
│ │ │ ├── notifications.py # Notification management
│ │ │ ├── analytics.py # Dashboard metrics
│ │ │ ├── reports.py # Report generation
│ │ │ ├── admin.py # User/role/dept management
│ │ │ ├── users.py # User profiles
│ │ │ ├── categories.py # File categories
│ │ │ └── websocket.py # Real-time notifications
│ │ ├── core/ # Settings, security, exceptions
│ │ │ ├── config.py # Environment configuration
│ │ │ ├── security.py # JWT, password hashing
│ │ │ ├── exceptions.py # Custom exception handlers
│ │ │ └── dependencies.py # FastAPI dependency injection
│ │ ├── db/ # SQLAlchemy models
│ │ │ ├── models/ # Database models
│ │ │ │ ├── file.py # File model with optimistic locking
│ │ │ │ ├── user.py # User with department/role
│ │ │ │ ├── department.py # Department model
│ │ │ │ ├── role.py # Role with JSON permissions
│ │ │ │ ├── file_transfer.py # Transfer history
│ │ │ │ ├── file_request.py # Cross-dept requests
│ │ │ │ ├── file_attachment.py # MinIO attachment ref
│ │ │ │ ├── audit.py # Immutable audit trail
│ │ │ │ └── enums.py # FileStatus, RequestStatus
│ │ │ ├── session.py # Async session factory
│ │ │ └── utils/seed.py # Database seeder
│ │ ├── schemas/ # Pydantic request/response models
│ │ ├── services/ # Business logic layer
│ │ │ ├── file_service.py # File CRUD with version check
│ │ │ ├── transfer_service.py # Transfer workflow
│ │ │ ├── request_service.py # Request approval logic
│ │ │ ├── attachment_service.py # MinIO operations
│ │ │ ├── notification_service.py # WebSocket notifications
│ │ │ ├── sla_service.py # SLA calculation
│ │ │ ├── report_service.py # Report queries
│ │ │ └── analytics_service.py # Dashboard aggregations
│ │ ├── tasks/ # Celery tasks
│ │ │ ├── celery_app.py # Celery configuration
│ │ │ └── file_tasks.py # SLA, email tasks
│ │ └── utils/ # Helpers
│ │ ├── storage.py # MinIO client wrapper
│ │ ├── notifications.py # Email/SMS helpers
│ │ ├── audit.py # Audit logging utility
│ │ └── pagination.py # Offset/limit pagination
│ ├── alembic/ # Database migrations
│ ├── tests/ # Pytest suite
│ ├── requirements.txt # Python dependencies
│ ├── pyproject.toml # Ruff + Mypy config
│ └── Dockerfile
│
├── frontend/ # React + TypeScript SPA
│ ├── src/
│ │ ├── api/ # Axios API client
│ │ ├── components/ # shadcn/ui components
│ │ ├── pages/ # Page components
│ │ │ ├── Dashboard.tsx # Analytics dashboard
│ │ │ ├── FileList.tsx # File listing with filters
│ │ │ ├── FileDetail.tsx # Single file view + history
│ │ │ ├── FileTransfer.tsx # Transfer form
│ │ │ ├── FileRequest.tsx # Request workflow
│ │ │ ├── Attachments.tsx # Attachment manager
│ │ │ ├── Reports.tsx # Report viewer
│ │ │ └── AdminPanel.tsx # User/role/dept management
│ │ ├── hooks/ # Custom React hooks
│ │ ├── store/ # Zustand stores
│ │ ├── router/ # React Router config
│ │ ├── types/ # TypeScript interfaces
│ │ └── App.tsx
│ ├── package.json
│ └── Dockerfile
│
├── docs/ # Documentation
│ ├── admin-guide.md
│ ├── user-manual.md
│ ├── deployment-guide.md
│ └── maintenance-guide.md
│
├── docker-compose.yml # Full stack orchestration
├── nginx.conf # Reverse proxy config
├── Makefile # Development tasks
└── README.mdDeployment
Docker Compose (Recommended)
# First-time setup
make setup
# Start all services
make start
# Access points
http://localhost # Web Application
http://localhost/api/docs # Swagger UI
http://localhost:9001 # MinIO ConsoleDefault Credentials
| Field | Value |
|---|---|
[email protected] | |
| Password | ChangeMe@1234 |
⚠️ Security: Change default passwords before production deployment.
Environment Variables
| Variable | Default | Description |
|---|---|---|
DATABASE_URL | PostgreSQL connection string | Async PostgreSQL URL |
SECRET_KEY | Random 32 hex | JWT signing key |
ACCESS_TOKEN_EXPIRE_MINUTES | 15 | JWT access token TTL |
REDIS_URL | Redis connection string | Celery broker |
MINIO_ENDPOINT | localhost:9000 | MinIO server |
SMTP_HOST | SMTP server | Email notifications |
Testing
# Backend tests with coverage
cd backend && pytest --cov=app --cov-report=html
# Linting
make lint-backend # Ruff + Mypy
make lint-frontend # ESLint
# Run specific test
pytest tests/test_files.py -vKey Design Decisions
Why Optimistic Locking?
Government offices often have multiple clerks working on the same files concurrently. Optimistic locking (opt_version field) prevents race conditions without requiring distributed locks:
- No blocking when file is not contested
- Clear error when concurrent modification detected
- Simple to implement and debug
Why Two-Step Receipt?
Traditional systems assume "assigned" means "received". OFL enforces explicit receipt confirmation to prevent:
- Ghost assignments (file shows assigned but never physically received)
- Accountability gaps
- Lost files with no trace
Why Celery for SLA?
SLA checks don't need real-time processing:
- Daily cron is sufficient for escalation
- Celery provides retry mechanism for failed notifications
- Beat scheduler handles recurring tasks reliably
- Workers scale independently of API server
Why MinIO?
Government files require secure, long-term storage:
- S3-compatible API for future migration flexibility
- Self-hosted (no cloud dependencies)
- Supports lifecycle policies for archival
- Built-in encryption at rest
Achievement Summary
OFL successfully replaced paper-based file registers in multiple government offices with:
- 100% accountability: Every transfer logged with timestamp and remarks
- Real-time visibility: Dashboard shows live file status across departments
- Zero admin bottlenecks: Clerks can return files to pool without supervisor
- SLA enforcement: Automatic escalation prevents file stagnation
- 7-year audit trail: Immutable records satisfy compliance requirements
- Docker deployment: Single-command setup for rapid onboarding
Architecture Feedback
Spotted a potential optimization or antipattern? Let me know.