Clients / Redux

redux-lenses-streaming is a redux middleware that facilitates connection to Lenses. You can build frontend applications for Apache Kafka. The library utilizes the WebSocket APIs of Lenses.

For a demo application and usage read the following blog post: http://www.landoop.com/blog/2017/12/redux-lenses-streaming/

(Tested with v3.7)

Install

npm i --save redux-lenses-streaming rxjs

The only peer dependency that the library has is rxjs.

Usage

First setup the redux store with the Kafka middleware:

import { createLensesMiddleware } from 'redux-lenses-streaming';

function configureStore() {
  const lensesMiddleware = createLensesMiddleware();
  //Any other middleware you might use
  const logger = createLogger();
  const middleware = [logger, lensesMiddleware];

  const store = createStore(
    rootReducer,
    applyMiddleware(...middleware),
  );

  return store;
}

Or you can customize the middleware with custom options, in which case it will attempt to connect:

import { createLensesMiddleware } from 'redux-lenses-streaming';

function configureStore() {
  const lensesOptions = {
    host: 'cloudera02.landoop.com:24006/api/kafka/ws',
    clientId: 'MyClientsName',
    // See Options section for full list
  };

  const lensesMiddleware = createLensesMiddleware(lensesOptions);
  const middleware = [..., lensesMiddleware];

  const store = createStore(
    rootReducer,
    applyMiddleware(...middleware),
  );

  return store;
}

After that, add the reducer:

import { combineReducers } from 'redux';
import { lensesReducer } from 'redux-lenses-streaming';
import sessionReducer from './sessionReducer';

const rootReducer = combineReducers({
  lenses: lensesReducer,
  //Add other application reducers where you listen
  // to the exposed actions (see list below)
  session: sessionReducer,
});

export default rootReducer;

Now you are ready to dispatch Actions using the provided action creators, that the middleware will intercept:

import { Actions } from 'redux-lenses-streaming';

dispatch(Actions.connect(options));
dispatch(Actions.login(options));
dispatch(Actions.publish(payload));
dispatch(Actions.subscribe(payload));
dispatch(Actions.unsubscribe(payload));
dispatch(Actions.disconnect());
...

You can also listen to various Action Types, dispatched by the middleware:

import { Type } from 'redux-lenses-streaming';

export const Type = {
  KAFKA_MESSAGE
  KAFKA_HEARTBEAT
  CONNECT
  CONNECT_SUCCESS
  CONNECT_FAILURE
  DISCONNECT
  DISCONNECT_SUCCESS
  DISCONNECT_FAILURE
  LOGIN
  LOGIN_SUCCESS
  LOGIN_FAILURE
  PUBLISH
  PUBLISH_SUCCESS
  PUBLISH_FAILURE
  SUBSCRIBE
  SUBSCRIBE_SUCCESS
  SUBSCRIBE_FAILURE
  UNSUBSCRIBE
  UNSUBSCRIBE_SUCCESS
  UNSUBSCRIBE_FAILURE
};
...

Options

const defaultOptions = {
  host: '',
  clientId: '',
  user: '',
  password: '',
  secure: false,
  timeout: 5000,
  autoCommitDelay: -1,
};

// For login action creator:
const options = {
  user: '',
  password: '',
};
host

Web socket address, including the port. If wss:// is not set, it will be added by the library. Example of address:

test.landoop.com:21112/api/kafka/ws wss://test.landoop.com:21112/api/kafka/ws

  • Default:
clientId

Client Id string. If the previous session found, it will send back messages on topic subscription.

  • Default:
user

User for authentication.

  • Default:
password

Password for authentication.

  • Default:
timeout

Timeout (ms) before publish / subscribe calls fail.

  • Default: 5000
autoCommitDelay

Delay (ms) for auto-committing last message. If -1, then you will need to manually send commit message.

  • Default: -1
secure
Force connection to wss.