import { isFunction, isNumber } from 'lodash';
import { Observable, pipe, timer } from 'rxjs';
import { concatMap, delay, filter, retryWhen, share, take, takeUntil, takeWhile } from 'rxjs/operators';

const DEFAULT_INTERVAL = 2000;

export const observableRepeater = <K>(
  observable: Observable<K>,
  endNotifier: (value: K) => boolean | Observable<any>,
  filterResults: boolean | any = false,
  retryOnError: boolean = true,
  interval: number = DEFAULT_INTERVAL,
  maxAttempts?: number,
): Observable<K> => {
  const pipeOperators = [];

  if (isNumber(maxAttempts)) {
    pipeOperators.push(take(maxAttempts));
  }

  pipeOperators.push(concatMap(() => observable));

  if (isFunction(endNotifier)) {
    pipeOperators.push(takeWhile((data: K) => !endNotifier(data), true));
  } else {
    pipeOperators.push(takeUntil(endNotifier as unknown as Observable<any>));
  }

  if (isFunction(filterResults)) {
    pipeOperators.push(filter((data) => filterResults(data)));
  }

  if (retryOnError === true) {
    pipeOperators.push(retryWhen((errors$) => errors$.pipe(delay(interval))));
  }

  pipeOperators.push(share());

  return timer(0, interval).pipe(pipe.apply(this, pipeOperators));
};
