storm-kafka示例详解_kafka读取的forcefromstart参数-程序员宅基地

技术标签: C_OHTERS  

一、原理介绍


本文内容参考:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhosts

(一)使用storm-kafka的关键步骤

1、创建ZkHosts

当storm从kafka中读取某个topic的消息时,需要知道这个topic有多少个分区,以及这些分区放在哪个kafka节点(broker)上,

ZkHosts就是用于这个功能。

关于kafka信息在zk中的内容请参考:http://blog.csdn.net/jinhong_lu/article/details/46653087

创建zkHosts有2种形式:

    public ZkHosts(String brokerZkStr, String brokerZkPath) 

    public ZkHosts(String brokerZkStr)

默认情况下,zk信息被放到/brokers中,此时可以使用第2种方式:

new ZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181”)

若zk信息被放置在/kafka/brokers中,则可以使用:


    public ZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181", /kafka"

或者直接:


new ZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181/kafka”)

默认情况下,每60秒去读取一次kafka的分区信息,可以通过修改host.refreshFreqSecs来设置。

除了使用ZkHosts来读取分析信息外,storm-kafka还提供了一种静态指定的方法,如:

    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
    partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
    partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
    partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);


由此可以看出,ZkHosts完成的功能就是指定了从哪个kafka节点读取某个topic的哪个分区。


2、创建KafkaConfig

(1)有2种方式创建KafkaConfig


    public KafkaConfig(BrokerHosts hosts, String topic)

    public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

BrokerHosts就是上面创建的实例,topic就是要订阅的topic名称,clientId用于指定存放当前topic consumer的offset的位置,这个id 应该是唯一的,否则多个拓扑会引起冲突。

事实上,trident的offset并不保存在这个位置,见下面介绍。

真正使用时,有2种扩展,分别用于一般的storm以及trident。


(2)core storm

Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely identify your spout.

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);

public SpoutConfig(BrokerHosts hosts, String topic, String id);

In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:

    

// setting for how often to save the current kafka offset to ZooKeeper
    public long stateUpdateIntervalMs = 2000;

    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
    // calls OutputCollector.fail().
    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
    // resubmitting the message while still retrying.
    public long retryInitialDelayMs = 0;
    public double retryDelayMultiplier = 1.0;
    public long retryDelayMaxMs = 60 * 1000;

KafkaSpout 只接受 SpoutConfig作为参数

(3)TridentKafkaConfig,TridentKafkaEmitter只接受TridentKafkaConfig使用参数

trident消费kafka的offset位置是在建立拓扑中指定,如:

topology.newStream(testkafkaSpout).

则offset的位置为:

/transactional/test/coordinator/currtx


(4)KafkaConfig的一些默认参数

    

public int fetchSizeBytes = 1024 * 1024;
    public int socketTimeoutMs = 10000;
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;
    public MultiScheme scheme = new RawMultiScheme();
    public boolean forceFromStart = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;

可以通过以下方式修改:

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());


3、设置MultiScheme

MultiScheme用于指定如果处理从kafka中读取到的字节,同时它用于控制输出字段名称。

  public Iterable<List<Object>> deserialize(byte[] ser);

  public Fields getOutputFields();

默认情况下,RawMultiScheme读取一个字段并返回一个字节,而发射的字段名称为bytes。

可以通过SchemeAsMultiScheme和 KeyValueSchemeAsMultiScheme改变这种默认行为:

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

上面的语句指定了将字节转化为字符。

同时建立拓扑时:

topology.newStream(“test", kafkaSpout).

        each(new Fields("str"), new FilterFunction(),new Fields("word”))….

会指定发射的字段名称为str。


4、创建Spout

(1)core storm

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

(2)trident

 OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);


5、建立拓扑:

(1)core-storm

builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 12);

kafka-reader指定了spout的名称,12指定了并行度。


(2)trident

topology.newStream(“test"kafkaSpout).

        each(new Fields("str"), new FilterFunction(),new Fields("word”))….

test指定了放置offset的位置,也就是txid的位置。

str指定了spout发射字段的名称。


完整示例:

Core Spout

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


Trident Spout

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);



(二)当拓扑出错时,如何从上一次的kafka位置继续处理消息

1、KafkaConfig有一个配置项为KafkaConfig.startOffsetTime,它用于指定拓扑从哪个位置上开始处理消息,可取的值有3个:

(1)kafka.api.OffsetRequest.EarliestTime(): 从最早的消息开始

(2)kafka.api.OffsetRequest.LatestTime(): 从最新的消息开始,即从队列队伍最末端开始。

(3)根据时间点: 可以参阅 How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 的实现原理。

How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?

 

Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.

 

For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.


2、由于运行拓扑时,指定了offset在zk中保存的位置,当出现错误时,可以找出offset

当重新部署拓扑时,必须保证offset的保存位置不变,它才能正确的读取到offset。

(1)对于core storm,就是

SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);

后2个参数不能变化

(2)对于trident而言,就是

topology.newStream(“test"kafkaSpout).

第1个参数不能变化。


3、也就是说只要拓扑运行过一次KafkaConfig.startOffsetTime,之后重新部署时均可从offset中开始。

再看看这2个参数

    public boolean forceFromStart = false;

    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

如果将forceFromStart(旧版本是ignoreZkOffsets)设置为true,则每次拓扑重新启动时,都会从开头读取消息。

如果为false,则:

第一次启动,从开头读取,之后的重启均是从offset中读取。

一般使用时,将数值设置为以上2个即可。



(三)结果写回kafka


如果想把结果写回kafka,并保证事务性,可以使用 storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.



Writing to Kafka as part of your topology

You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.

You need to provide implementation of following 2 interfaces


TupleToKafkaMapper and TridentTupleToKafkaMapper

These interfaces have 2 methods defined:

    K getKeyFromTuple(Tuple/TridentTuple tuple);

    V getMessageFromTuple(Tuple/TridentTuple tuple);

as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.java implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.


KafkaTopicSelector and trident KafkaTopicSelector

This interface has only one method

public interface KafkaTopicSelector {

    String getTopics(Tuple/TridentTuple tuple);

}

The implementation of this interface should return topic to which the tuple's key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor.


Specifying kafka producer properties

You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs section "Important configuration properties for the producer", in your storm topology config by setting the properties map with key kafka.broker.properties.



附带2个官方的示例

For the bolt :

        

TopologyBuilder builder = new TopologyBuilder();

        Fields fields = new Fields("key", "message");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                    new Values("storm", "1"),
                    new Values("trident", "1"),
                    new Values("needs", "1"),
                    new Values("javadoc", "1")
        );
        spout.setCycle(true);
        builder.setSpout("spout", spout, 5);
        KafkaBolt bolt = new KafkaBolt()
                .withTopicSelector(new DefaultTopicSelector("test"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

        Config conf = new Config();
        //set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

        StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());

For Trident:

        

Fields fields = new Fields("word", "count");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                new Values("storm", "1"),
                new Values("trident", "1"),
                new Values("needs", "1"),
                new Values("javadoc", "1")
        );
        spout.setCycle(true);

        TridentTopology topology = new TridentTopology();
        Stream stream = topology.newStream("spout1", spout);

        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());

        Config conf = new Config();
        //set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
        StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());


二、示例介绍

(一)简介

1、本项目完整代码请见https://github.com/jinhong-lu/stormkafkademo/tree/master/src/main/java/org/jinhong/demo/storm_kafka/trident。

2、本项目主要完成以下功能:

(1)从kafka中读取一个topic的消息,然后根据空格拆分单词,最后统计数据后写入一个HazelCastState(一个分布式的内存存储框架)。

(2)通过DRPC从上述的HazelCastState中读取结果,并将结果输出。

3、代码可分为3部分:

(1)单词拆分

(2)定义拓扑行为

(3)state定义

以下分为三部分分别介绍。


(二)单词拆分

原理很简单,就是通过空格将单词进行拆分。

<span style="font-family:Courier New;font-size:14px;">public class WordSplit extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String sentence = (String) tuple.getValue(0);
        if (sentence != null) {
            sentence = sentence.replaceAll("\r", "");
            sentence = sentence.replaceAll("\n", "");
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }
}</span>

这里的wordsplit是一个function,它继承自BaseFunction,最后,它将拆分出来的单词逐个emit出去。


(三)定义拓扑行为

1、定义kafka的相关配置

        

<span style="font-family:Courier New;font-size:14px;">        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "storm-sentence", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
</span>

<span style="font-family:Courier New;font-size:14px;"></pre><p></p><p class="p3">(1)首先定义一个kafka相关的配置对象,第一个参数是zookeeper的位置,第二个参数是订阅topic的名称,第三个参数是一个clientId(2)然后对配置进行一些设置,包括一些起始位置之类的,后面再补充具体的配置介绍。(3)创建一个spout,这里的spout是事务型的,也就是保证每一个</p><p class="p1">2、定义拓扑,进行单词统计后,写入一个分布式内存中。</p><p class="p2"></p><pre name="code" class="java">                TridentTopology >topology= new TridentTopology();</span>
<span style="font-family:Courier New;font-size:14px;">	TridentState wordCounts = topology.newStream("kafka", kafkaSpout).shuffle().
                each(new Fields("str"), new WordSplit(), new Fields("word")).
                groupBy(new Fields("word")).
                persistentAggregate(new HazelCastStateFactory(), new Count(), new Fields("aggregates_words")).parallelismHint(2);</span>
(1)创建一个topo。

(2)首先定义一个输入流,其中第一个参数定义了zk中放置这个topo元信息的信息,一般是/transactional/kafka

(3)对每个输入的消息进行拆分:首先它的输入是字段名称为str的消息,然后经过WordSplit这个Function处理,最后,以字段名称word发送出去

(4)将结果根据word字段的值进行分组,就是说word值相同的放在一起。

(5)将分组的结果分别count一下,然后以字段名称aggregates_words写入HazelCastStateFactory定义的state中,关于state请见下一部分的介绍。


  3、从分布式内存中读取结果并进行输出

<span style="font-family:Courier New;font-size:14px;">        topology.newDRPCStream("words", drpc)
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));</span>

(1)第三行定义了使用drpc需要处理的内容

(2)查询分布式内存中的内容,查询字段为word,然后以字段名count发送出去。

(3)将不需要统计的过滤掉。

(4)将结果进行聚合。


4、主函数

<span style="font-family:Courier New;font-size:14px;">       String kafkaZk = args[0];
        SentenceAggregationTopology sentenceAggregationTopology = new SentenceAggregationTopology(kafkaZk);
        Config config = new Config();
        config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);

        if (args != null && args.length > 1) {
            String name = args[1];
            String dockerIp = args[2];
            config.setNumWorkers(2);
            config.setMaxTaskParallelism(5);
            config.put(Config.NIMBUS_HOST, dockerIp);
            config.put(Config.NIMBUS_THRIFT_PORT, 6627);
            config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
            config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
            StormSubmitter.submitTopology(name, config, sentenceAggregationTopology.buildTopology());
        } else {
            LocalDRPC drpc = new LocalDRPC();
            config.setNumWorkers(2);
            config.setMaxTaskParallelism(2);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafka", config, sentenceAggregationTopology.buildTopology(drpc));
            while (true) {
                System.out.println("Word count: " + drpc.execute("words", "the"));
                Utils.sleep(1000);
            }

        }</span>

三个参数的含义为:

   /* args[0]:kafkazk,如:192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181,192.168.172.117:2181/kafka

     * args[1]:topo名称

     * args[2]:niubus节点,如,192.168.172.98

     */

当参数数据大于1时,将拓扑提交到集群中,否则提交到本地。提交拓扑到集群的比较直观,下面郑重介绍一下drpc的查询。

(1)首先定义一个本地的drpc对象,以及一个本地storm集群。

(2)然后将拓扑群提交到本地集群。

(3)最后,使用drpuc不停的循环查询统计结果并输出。


注意上面的拓扑定义了2个流,第一个流用于接收kafka消息,然后拆分统计后写入内存,第二个流则接受drpc的输入,将drpc的输入拆分后,再统计需要查询的每个单词的统计结果。如在本例中,需要显示单词the的数量。

在本例中,drpc和kafka没有本质的区别,它们都是一个用于向storm发送消息的集群,只是输入数据的方式有些不同,kafka通过spout输入,drpc则直接通过execute()进行输入。


运行方式:

方式一:直接在eclipse右键运行,参数只填一个,如123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181,123.58.172.117:2181/kafka。只要保证kafka集群中有对应的topic,则会得到以下输出:

Word count: [[2]]

Word count: [[5]]

Word count: [[10]]

Word count: [[17]]

Word count: [[28]]

当然,统计结果根据输入kafka的内容而不同。


(四)state定义

在定义拓扑的时候,最终的wordcount结果写在了HazelCastState中:

persistentAggregate(new HazelCastStateFactory(),new Count(), new Fields("aggregates_words"))

下面我们分析一下如何使用state来保存topo的处理结果,或者是中间处理结果。

注意,使用state除了可以保存最终的结果输出,以保证事务型、透明事务型以外,还经常用于保存中间结果。比如blueprint第3章的一个例子中,用于统计疾病的发生数量,如果超过预警值,则向外发信息。如果统计结果成功,但向外发送信息失败,则spout会重发数据,导致统计结果有误,因此,此时可以通过state将结果保存下来。


1、Factory类

<span style="font-family:Courier New;font-size:14px;">public class HazelCastStateFactory implements StateFactory {
    @Override
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
        return TransactionalMap.build(new HazelCastState(new HazelCastHandler()));
    }
}
</span>


内容很简单,就是返回一个state,它也是三个state相关的类中唯一对外的接口。


2、Handler类

<span style="font-family:Courier New;font-size:14px;">public class HazelCastHandler implements Serializable {

    private transient Map<String, Long> state;

    public Map<String, Long> getState() {
        if (state == null) {
            state = Hazelcast.newHazelcastInstance().getMap("state");
        }
        return state;
    }</span>

使用单例模式返回一个map。


3、State类

真正处理业务逻辑的类。主要的方法有mutiPut和mutiGet,用于将结果放入state与取出state。


<span style="font-family:Courier New;font-size:14px;">public class HazelCastState<T> implements IBackingMap<TransactionalValue<Long>> {

    private HazelCastHandler handler;


    public HazelCastState(HazelCastHandler handler) {
        this.handler = handler;
    }

    public void addKeyValue(String key, Long value) {
        Map<String, Long> state = handler.getState();
        state.put(key, value);
    }

    @Override
    public String toString() {
        return handler.getState().toString();
    }


    @Override
    public void multiPut(List<List<Object>> keys, List<TransactionalValue<Long>> vals) {
        for (int i = 0; i < keys.size(); i++) {
            TridentTuple key = (TridentTuple) keys.get(i);
            Long value = vals.get(i).getVal();
            addKeyValue(key.getString(0), value);
            //System.out.println("[" + key.getString(0) + " - " + value + "]");
        }
    }


    public List multiGet(List<List<Object>> keys) {
        List<TransactionalValue<Long>> result = new ArrayList<TransactionalValue<Long>>(keys.size());
        for (int i = 0; i < keys.size(); i++) {
            TridentTuple key = (TridentTuple) keys.get(i);
            result.add(new TransactionalValue<Long>(0L, MapUtils.getLong(handler.getState(), key.getString(0), 0L)));
        }
        return result;
    }
}</span>






版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/jinhong_lu/article/details/46766195

智能推荐

Mendix客户项目总结_mendix项目-程序员宅基地

文章浏览阅读983次。​从2005年成立至今,Mendix一直以客户价值为导向精心打磨产品,提升客户、合作伙伴及所有参与方的使用体验。进入中国以来,我们更是以120%的态度践行这一原则。工欲善其事必先利其器,这里的“器”不仅仅在于产品本身,更在于产品背后代表的思维方式,实施合作伙伴的培养与赋能,解决方案架构,项目管理与执行,客户数字化转型的长远的计划与思考等等。只有不断地用先进的理论指导实践,并在实践中迭代成长,才能充分发挥先进生产工具的最大价值,创造多方的共赢局面。因此,我们会建议Mendix客户针对第一期的开发项目认真地_mendix项目

MATLAB修改字体大小,加粗,字体样式,字体颜色,线形,希腊字母,特殊符号,图片文本位置及字体设置等_matlab字体-程序员宅基地

文章浏览阅读2.3w次,点赞41次,收藏255次。MATLAB修改字体大小,加粗,字体样式,字体颜色,线形颜色格式,添加标签,修改标签格式,希腊字母表,图片文本位置等_matlab字体

Spring环境配置(安装springsource-tool-suite插件)及第一个Spring HelloWorld-程序员宅基地

文章浏览阅读3.6k次。SpringSoource-tool-suite安装为了使用Spring的方便,首先Eclipse安装插件SpringSoource-tool-suite(SpringSoource-tool-suite 是一个基于EclipseIDE开发环境中的用于开发Spring应用程序的插件。利用这个插件,可以方便的在Eclipse平台上开发基于Spring框架的应用)。如下是SpringSoource-to_springsource-tool-suite

mysql 效率监控_zabbix监控mysql性能-程序员宅基地

文章浏览阅读156次。通过获取mysql状态值将这些状态值传递给服务器并绘制成图片,这样可以观察mysql的工作情况,通常需要获得状态变量有以下Com_update:mysql执行的更新个数Com_select:mysql执行的查询个数Com_insert:mysql执行插入的个数Com_delete:执行删除的个数Com_rollback:执行回滚的操作个数Bytes_received:接受的字节数Bytes_sen..._zabbix slow queries over 3 for 5m

正交法设计用例_四阶正交拉丁方阵-程序员宅基地

文章浏览阅读4.6k次,点赞3次,收藏14次。正交实验法的由来一、正交表的由来拉丁方名称的由来 古希腊是一个多民族的国家,国王在检阅臣民时要求每个方队中每行有一个民族代表,每列也要有一个民族的代表。数学家在设计方阵时,以每一个拉丁字母表示一个民族,所以设计的方阵称为拉丁方。什么是n阶拉丁方?用n个不同的拉丁字母排成一个n阶方阵(n<26 ),如果每行的n个字母均不相同,每列的n个字母均不相同,则称这种方阵为n*n拉丁方或n阶拉丁方。每个字母在任一行、任一列中只出现一次。_四阶正交拉丁方阵

细粒度情感分析在到餐场景中的应用_absa数据标注工具-程序员宅基地

文章浏览阅读3.6k次。总第482篇2021年 第052篇经典的细粒度情感分析(ABSA,Aspect-based Sentiment Analysis)主要包含三个子任务,分别为属性抽取、观点抽取以及属性-观点..._absa数据标注工具

随便推点

PWA登陆iOS了,但它还有这些缺陷_pwa的缺点-程序员宅基地

文章浏览阅读2.1k次。Apple 在 iOS 11.3 中悄悄加入了对“渐进式 Web 应用”(PWA)这一系列新技术的基本支持。是时候看看这些技术是如何生效的?它有什么能力?会遇到哪些挑战?以及如果已经发布了 PWA,又需要了解哪些事情?本文概括介绍了最新发布的 iOS 11.3 对 PWA 的支持情况,以及 PWA 应用开发者需要注意的问题。本文转载自前端之巅作者 Maximiliano Firtman..._pwa的缺点

Qt 加载图片文件路径详解_qt资源图片路径-程序员宅基地

文章浏览阅读7.6k次,点赞3次,收藏23次。QT 加载文件,图片路径很容易搞混,需要注意的是WINDOW路径分隔符为“\”,QT为“/”,我遇到的路径加载总结为三种情况:(1)绝对路径,文件的整个路径,比如 setWindowIcon(QIcon("F:/QT_PROJECT/QTtest/test/res/123.jpg"));//加载图片绝对路径(2)相对路径 1.第一种情况,新建QT 资源文件,也就..._qt资源图片路径

wanchain是个什么样的项目_wanchain与delphy都是同一个团队的项目吗?你们是不是来圈钱的?-程序员宅基地

文章浏览阅读1.4k次。万维链旨在建立一个分布式的未来“银行”,万维链本身是一个分布式的基于数字资产的金融基础设施,任何机构和个人,都可以在万维链上开设自己的业务窗口,提供基于数字资产的存贷、兑换、支付、结算等服务。更加准确的描述,万维链是一个基于区块链的分布式超级金融市场。_wanchain与delphy都是同一个团队的项目吗?你们是不是来圈钱的?

Linux 系统下搭建 Gitlab 服务器-程序员宅基地

文章浏览阅读2.7k次。1、安装依赖工具// 安装技术依赖yum install -y curl policycoreutils-python openssh-server // 启动ssh服务/设置为开机启动sudo systemctl enable sshdsudo systemctl start sshd2、安装 Postfix 邮件服务器// 安装 postfixsudo yum ins..._linux 安装gitlab服务器

LiveNVR配置拉转RTSP传统海康大华安防摄像机直播流输出RTSP/RTMP/HLS/HTTP-FLV如何获取直播流地址_公网 支持rtsp、http的摄像头-程序员宅基地

文章浏览阅读1.4k次。LiveNVR配置拉转RTSP传统海康大华安防摄像机直播流输出RTSP/RTMP/HLS/HTTP-FLV如何获取直播流地址1、 Onvif/RTSP流媒体服务2、配置拉转直播流2.1 RTSP获取配置规则2.2 编辑通道配置3、接口获取视频流地址3.1、获取通道直播链接接口3.2、获取HTTP-FLV播放地址示例3.3、获取WS-FLV播放地址示例3.3、获取RTMP播放地址示例3.4、获取HLS播放地址示例3.4、获取RTSP播放地址示例4、浏览器F12查看播放地址5、播放页面快速集成1、 Onvif_公网 支持rtsp、http的摄像头

Python 爬取qqmusic音乐url并批量下载_qq音乐源url怎么下载-程序员宅基地

文章浏览阅读1.7k次,点赞3次,收藏13次。使用Python爬虫批量下载音乐_qq音乐源url怎么下载