import { filter, startWith, take, tap } from 'rxjs/operators';
import { firstValueFrom, Subject } from 'rxjs'

export interface SemaphoreLock {
	release(): void
	type: 'read' | 'exclusive'
	forContext?: string
	obtainedAt: number
	warnIfHeldLongerTimeout?: any
}

interface ObtainLockOptions {
	forContext?: string | (() => string)
	type?: SemaphoreLock['type']
	timeoutMs?: number
}

export class Semaphore {
	static debugLevel: 0 | 1 | 2 = 0
	static warnIfHeldLongerThanMs = 5_000
	static timeoutAndRejectRequestAfterMs = 0

	private grantedLocks: SemaphoreLock[] = [];
	private lockQueue: SemaphoreLock[] = [];
	
	private queueChangedSubject = new Subject<void>()
	private nextLocksGrantedSubject = new Subject<void>()

	constructor(private defaultObtainOptions?: ObtainLockOptions) {}

	async obtainLock(options?: ObtainLockOptions): Promise<SemaphoreLock> {
		if(Semaphore.debugLevel >= 2) {
			if(this.lockQueue.length) {
				console.log('Lock queue size:', this.lockQueue.length)
			}
		}
		let { type, timeoutMs, forContext } = {
			type: 'exclusive',
			timeoutMs: Semaphore.timeoutAndRejectRequestAfterMs,
			...this.defaultObtainOptions,
			...options,
		} as const
		if(typeof forContext === 'function') forContext = forContext()

		if(Semaphore.debugLevel) {
			const requestedExclusiveLocks = this.lockQueue.filter(l => l.type === 'exclusive')
			const requestedReadLocks = this.lockQueue.filter(l => l.type === 'read')
			if(type == 'exclusive') {
				console.log(`??# Semaphore of type ${type} requested${forContext ? ` for ${forContext}` : ''}; ${requestedExclusiveLocks.length} requested exclusive locks, ${requestedReadLocks.length} requested read locks`)
			} else {
				console.log(`??+ Semaphore of type ${type} requested${forContext ? ` for ${forContext}` : ''}; ${requestedExclusiveLocks.length} requested exclusive locks, ${requestedReadLocks.length} requested read locks`)
			}
		}

		const semaphore = this
		const lock: SemaphoreLock = {
			release() {
				semaphore.releaseLock(lock)
				clearTimeout(this.warnIfHeldLongerTimeout)
			},
			type,
			forContext,
			obtainedAt: Date.now(),
		}

		const lockPromise = new Promise<SemaphoreLock>((res, rej) => {
			const waitTimeout = timeoutMs > 0 ? setTimeout(() => {
				const message = `Timeout waiting for ${type} semaphore${forContext ? ` for ${forContext}` : ''} after ${timeoutMs}ms`
				console.error(message)
				rej(message)
			}, timeoutMs) : undefined
			
			this.nextLocksGrantedSubject.pipe(
				filter(() => this.grantedLocks.includes(lock)),
				take(1),
				tap(() => {
					clearTimeout(waitTimeout)
					if(Semaphore.debugLevel) {
						if(type == 'exclusive') {
							console.log(`### Semaphore of type ${type} granted${forContext ? ` for ${forContext}` : ''}`)
						} else {
							console.log(`+++ Semaphore of type ${type} granted${forContext ? ` for ${forContext}` : ''}`)
						}
					}

					lock.warnIfHeldLongerTimeout = setTimeout(() => {
						console.warn(`Semaphore has held ${type} lock${forContext ? ` for ${forContext}` : ''} for ${Date.now() - lock.obtainedAt}ms`)
					}, Semaphore.warnIfHeldLongerThanMs)
					res(lock)
				})
			).subscribe(() => {})
		})

		this.lockQueue.push(lock)
		this.queueChangedSubject.next()
		this.tryToGrantNextLocks()

		return lockPromise
	}

	private releaseLock(lock: SemaphoreLock) {
		clearTimeout(lock.warnIfHeldLongerTimeout)
		this.grantedLocks = this.grantedLocks.filter(l => l !== lock)
		if(Semaphore.debugLevel) {
			const requestedExclusiveLocks = this.lockQueue.filter(l => l.type === 'exclusive')
			const requestedReadLocks = this.lockQueue.filter(l => l.type === 'read')
			console.log(`--- Semaphore of type ${lock.type} released${lock.forContext ? ` for ${lock.forContext}` : ''}; ${requestedExclusiveLocks.length} requested exclusive locks, ${requestedReadLocks.length} requested read locks`)
		}
		this.queueChangedSubject.next()
		this.tryToGrantNextLocks()
	}

	isLocked() {
		return this.grantedLocks.length > 0
	}

	waitForEmptyQueue() {
		return firstValueFrom(this.queueChangedSubject.pipe(
			filter(() => !this.lockQueue.length),
		))
	}
	
	waitForEmptyWriteQueue() {
		return firstValueFrom(this.queueChangedSubject.pipe(
			startWith(undefined),
			filter(() => !this.lockQueue.some(l => l.type === 'exclusive')),
		))
	}
	
	private tryToGrantNextLocks() {
		if(!this.lockQueue.length) return

		const grantedExclusiveLocks = this.grantedLocks.filter(l => l.type === 'exclusive')
		if(grantedExclusiveLocks.length) {
			if(Semaphore.debugLevel >= 2) console.log('    Exclusive lock is already granted, no more locks can be granted')
			return
		}

		if(!this.grantedLocks.length) {
			// no locks are granted yet, try to grant first exclusive lock
			const requestedExclusiveLocks = this.lockQueue.filter(l => l.type === 'exclusive')
			if(requestedExclusiveLocks.length) {
				// grant the first exclusive lock
				if(Semaphore.debugLevel >= 2) console.log('    Granting exclusive lock')
				this.grantLocks([requestedExclusiveLocks[0]])
				return
			}
		}
		
		// still no exclusive locks are granted, grant all read locks
		if(Semaphore.debugLevel >= 2) console.log('    No exclusive locks, granting all read locks')
		const requestedReadLocks = this.lockQueue.filter(l => l.type === 'read')
		this.grantLocks(requestedReadLocks)
	}

	private grantLocks(locks: SemaphoreLock[]) {
		this.grantedLocks.push(...locks)
		this.lockQueue = this.lockQueue.filter(l => !locks.includes(l))
		this.nextLocksGrantedSubject.next()
	}

	async withReadLock(options: ObtainLockOptions, fn: () => void | Promise<void>): Promise<void>
	async withReadLock(fn: () => void | Promise<void>): Promise<void>
	async withReadLock(fnOrOptions: ObtainLockOptions | (() => void | Promise<void>), fn?: () => void | Promise<void>) {
		const options = typeof fnOrOptions === 'object' ? fnOrOptions : undefined
		fn = fn ?? (fnOrOptions as () => void | Promise<void>)

		const lock = await this.obtainLock({ ...options, type: 'read' })
		try {
			await fn()
		} finally {
			lock.release()
		}
	}

	async withExclusiveLock<T>(options: ObtainLockOptions, fn: () => T | Promise<T>): Promise<T>
	async withExclusiveLock<T>(fn: () => T | Promise<T>): Promise<T>
	async withExclusiveLock<T>(fnOrOptions: ObtainLockOptions | (() => T | Promise<T>), fn?: () => T | Promise<T>) {
		const options = typeof fnOrOptions === 'object' ? fnOrOptions : undefined
		fn = fn ?? (fnOrOptions as () => T | Promise<T>)

		const lock = await this.obtainLock({ ...options, type: 'exclusive' })
		try {
			const result = await fn()
			return result
		} finally {
			lock.release()
		}
	}
}

// async function test() {
// 	Semaphore.debug = 2
// 	const semaphore = new Semaphore()
// 	const e1 = semaphore.obtainLock({ forContext: 'E1', type: 'exclusive' })
// 	const r1 = semaphore.obtainLock({ forContext: 'R1', type: 'read' })
// 	const r2 = semaphore.obtainLock({ forContext: 'R2', type: 'read' })
// 	const e2 = semaphore.obtainLock({ forContext: 'E2', type: 'exclusive' })
	
// 	e1.then(l => l.release())
// 	e2.then(l => l.release())
// 	r1.then(l => l.release())
// 	setTimeout(() => {
// 		const r3 = semaphore.obtainLock({ forContext: 'R3', type: 'read' })
// 		r2.then(l => l.release())
// 		r3.then(l => l.release())
// 	}, 1000)
// }
// test()