import {Observable, Subject} from 'rxjs';

interface QueueTask {
    task: () => Observable<any>;
    callback: (data: any) => void;
    error: (error: any) => void;

}
export abstract class BlockingQueueService {
    private queue: QueueTask[] = [];
    private isRunning = false;
    private queueChanged = new Subject<void>();

    protected enqueue(task: () => Observable<any>, callback: (data: any) => void, error: (data: any) => void): void {
        this.queue.push({task, callback, error});
        this.queueChanged.next();
        this.start();
    }

    protected start(): void {
        if (!this.isRunning && this.queue.length > 0) {
            this.isRunning = true;
            const {task, callback, error} = this.queue.shift() as QueueTask;
            task().subscribe({
                next: (data: any) => {
                    callback(data);
                },
                complete: () => {
                    this.isRunning = false;
                    this.start();
                },
                error: (ex: any) => {
                    error(ex);
                    this.isRunning = false;
                    this.start();
                }
            });
        }
    }

    public flush(): void {
        this.queue = [];
        this.isRunning = false;
        this.queueChanged.next();
    }

    public get queueChanged$(): Observable<void> {
        return this.queueChanged.asObservable();
    }
}
