import {
  combineLatest,
  EMPTY,
  from,
  fromEvent,
  interval,
  merge,
  of,
} from "rxjs";
import {
  get as _get,
  omit,
  omitBy,
  isNil,
  isEmpty,
  isEqual,
  noop,
} from "lodash";
import { stringify } from "qs";

import {
  addMessages,
  setUnread,
  getRecent,
  OpenChatAction,
} from "shared/actions/chat";
import combineEpicsCatchAll from "shared/utils/epic";

import { get, endpoints } from "common/utils/api";

import { requestPermission, notify } from "shared/utils/notification";
import {
  debounceTime,
  delay,
  distinctUntilChanged,
  filter,
  map,
  mapTo,
  mergeMap,
  skip,
  startWith,
  switchMap,
  takeUntil,
  tap,
  throttleTime,
} from "rxjs/operators";
import { ActionsObservable, ofType, StateObservable } from "redux-observable";
import { RootState } from "store";
import { inMobileApp } from "common/utils/inMobileApp";
import { SetUserAction } from "common/actions/user";
import { messageActions } from "shared/reducers/chat/messages";
import {
  conversationActions,
  partChat,
} from "shared/reducers/chat/conversations";

const requestHandler = (url: string) =>
  get(
    url,
    true,
    {},
    {
      // Note: this response handler will ignore HTTP 403.
      // If 403 happens, it is because the session has expired or, most commonly,
      // the user has just logged out when the request hits the backend. The default
      // request handler will render the PAGE_LOAD_FAILED error modal in those cases
      // which would be both confusing and unnecessary in this context.
      responseHandler: (res: Response) => {
        if (res.status === 403) {
          return true; // ignore response
        } else {
          return false; // pass through
        }
      },
    },
  ).catch(noop);

const requestFn = (id: string, params: any) =>
  requestHandler(`${endpoints.messages(id)}?${stringify(params)}`);

const reqNewMsgCount = () =>
  requestHandler(`${endpoints.conversations.recent}?count_only=true`);

const latestMessageTime = (state: RootState) =>
  state.chat?.messages.last_message_time;

const firstMessageTime = (state: RootState) =>
  state.chat?.messages.first_message_time;

const intervalRequests$ = (
  id: string,
  getLatestMessageTime: () => string | undefined,
) =>
  interval(4000).pipe(
    mergeMap(() =>
      requestFn(
        id,
        omitBy(
          {
            after_time: getLatestMessageTime(),
            set_read: ~~(document.hasFocus() || inMobileApp()),
          },
          isNil,
        ),
      ).catch(noop),
    ),
  );

const chatPoll = (
  action$: ActionsObservable<OpenChatAction>,
  state$: StateObservable<RootState>,
) =>
  action$.pipe(
    ofType("OPEN_CHAT"),
    mergeMap(action =>
      merge(
        requestFn(action.id, {
          set_read: ~~(document.hasFocus() || inMobileApp()),
        }),
        intervalRequests$(action.id, () =>
          latestMessageTime(state$.value),
        ).pipe(map(d => Object.assign({ poll: true }, d))),
      ).pipe(takeUntil(action$.ofType(partChat.type, "UNAUTHORIZED"))),
    ),
    map(res => {
      if (isEmpty(res?.data))
        return res?.poll ? null : messageActions.setNoMessages();
      else return addMessages(res);
    }),
    filter(Boolean), // only emit setNoMessages once
  );

const userActiveIn = (timeout: number) => {
  const active = from(["mousemove", "keyup", "touchstart"]).pipe(
    mergeMap(ev => fromEvent(document, ev)),
    throttleTime(timeout / 10),
    mapTo(true),
    startWith(true),
  );
  const inactive = active.pipe(switchMap(() => of(false).pipe(delay(timeout))));
  return merge(active, inactive).pipe(distinctUntilChanged());
};

const inactiveAfter = 60000;
const pollInterval = 10000;
const multiplierWhenInactive = 6;

const unreadMessagesPoll = (
  action$: ActionsObservable<SetUserAction>,
  state$: StateObservable<RootState>,
) =>
  combineLatest([
    userActiveIn(inactiveAfter),
    interval(pollInterval).pipe(startWith(0)),
  ]).pipe(
    filter(
      ([isActive, index]) => isActive || index % multiplierWhenInactive === 0,
    ),
    filter(() => Boolean(state$.value.user.conversations_enabled)),
    switchMap(reqNewMsgCount),
    filter<any>(Boolean), // Failed request returns undefined
    map(convs =>
      omit(convs, _get(state$.value, "conversations.conversation.id", "")),
    ),
    map(setUnread),
    filter(
      d =>
        !isEmpty(d.payload) &&
        !isEqual(d.payload, state$.value.chat?.conversations.unread),
    ),
    takeUntil(action$.ofType("UNAUTHORIZED")),
  );

const startUnreadMessagesPoll = (
  action$: ActionsObservable<any>,
  state$: StateObservable<RootState>,
) =>
  action$.pipe(
    ofType("SET_USER"),
    switchMap(({ data }) => {
      if (data.type === "Client") {
        return unreadMessagesPoll(action$, state$);
      }
      return EMPTY;
    }),
  );

// update new messages section if viewing welcome page
// and try to show a desktop notification
const unreadChange = (
  action$: ActionsObservable<any>,
  state$: StateObservable<RootState>,
) =>
  action$.pipe(
    ofType(conversationActions.setUnread.type),
    skip(1), // skip notification on initial load + double req when loading index
    filter(a => !isEmpty(a.data)), // no notifying when the unreads get cleared
    tap(() =>
      notify(state$.value.intl.messages["conversations.new_message_alert"]),
    ),
    filter(() => window.location.pathname === "/"),
    filter(() => Boolean(state$.value.user.conversations_enabled)),
    map(getRecent),
  );

const loadHistory = (
  action$: ActionsObservable<any>,
  state$: StateObservable<RootState>,
) =>
  action$.pipe(
    filter(messageActions.startLoadingMessages.match),
    debounceTime(300),
    switchMap(action =>
      from(
        requestFn(action.payload, {
          before_time: firstMessageTime(state$.value),
        }),
      ).pipe(takeUntil(action$.ofType(partChat.type))),
    ),
    filter(e => {
      const { data = [] } = e || {};
      return data?.length;
    }),
    map(addMessages),
  );

// request show desktop notification on new messages
requestPermission();

// ts-prune-ignore-next
export default combineEpicsCatchAll(
  startUnreadMessagesPoll,
  chatPoll,
  loadHistory,
  unreadChange,
);
