mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-07 06:02:07 +08:00
ignore read kafka-controller data when znode not exist
This commit is contained in:
@@ -19,13 +19,13 @@ import org.springframework.dao.DuplicateKeyException;
|
|||||||
* @date 20/5/14
|
* @date 20/5/14
|
||||||
*/
|
*/
|
||||||
public class ControllerStateListener implements StateChangeListener {
|
public class ControllerStateListener implements StateChangeListener {
|
||||||
private final static Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class);
|
||||||
|
|
||||||
private Long clusterId;
|
private final Long clusterId;
|
||||||
|
|
||||||
private ZkConfigImpl zkConfig;
|
private final ZkConfigImpl zkConfig;
|
||||||
|
|
||||||
private ControllerDao controllerDao;
|
private final ControllerDao controllerDao;
|
||||||
|
|
||||||
public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) {
|
public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) {
|
||||||
this.clusterId = clusterId;
|
this.clusterId = clusterId;
|
||||||
@@ -35,9 +35,12 @@ public class ControllerStateListener implements StateChangeListener {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
processControllerChange();
|
if (!checkNodeExist()) {
|
||||||
|
LOGGER.warn("kafka-controller data not exist, clusterId:{}.", clusterId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
processControllerChange();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onChange(State state, String path) {
|
public void onChange(State state, String path) {
|
||||||
@@ -49,11 +52,20 @@ public class ControllerStateListener implements StateChangeListener {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.",
|
LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", clusterId, state, path, e);
|
||||||
clusterId, state, path, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean checkNodeExist() {
|
||||||
|
try {
|
||||||
|
return zkConfig.checkPathExists(ZkPathUtil.CONTROLLER_ROOT_NODE);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("init kafka-controller data failed, clusterId:{}.", clusterId, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private void processControllerChange() {
|
private void processControllerChange() {
|
||||||
LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId);
|
LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId);
|
||||||
ControllerData controllerData = null;
|
ControllerData controllerData = null;
|
||||||
|
|||||||
Reference in New Issue
Block a user