Kafka ベースのシステムで Shed Lock を使用するにはどうすればよいですか?

Oct 29, 2025

伝言を残す

アマンダ・リー
アマンダ・リー
サステナビリティアナリストとして、私は生産プロセスに環境に優しいプラクティスの実施に焦点を当てています。 Lianhuでは、環境フットプリントを最小限に抑えることに取り組んでいます。

ちょっと、そこ!私は Shed Locks のサプライヤーです。今日は、Kafka ベースのシステムで Shed Locks を使用する方法についてお話したいと思います。少し専門的に聞こえるかもしれませんが、簡単に説明しますので、心配しないでください。

まず最初に、Kafka と Shed Lock とは何なのかを簡単に説明します。 Kafka は、リアルタイム データ フィードの処理で非常に人気のある分散ストリーミング プラットフォームです。これにより、レコードのストリームを公開、購読、保存、処理できます。一方で、シェッドロックは、分散システムでタスクのインスタンスを一度に 1 つだけ実行するようにする場合に優れたツールです。

では、なぜ Kafka ベースのシステムで Shed Lock を使用したいのでしょうか? Kafka セットアップでは、トピックからのメッセージを処理する複数のコンシューマが存在することがよくあります。場合によっては、同時に実行すべきではないタスクがある場合があります。たとえば、Kafka メッセージに基づいて共有リソースを更新するタスクを作成できます。このタスクの複数のインスタンスが同時に実行されると、データの不整合やその他の問題が発生する可能性があります。そこで便利なのがシェッドロックです。

Shed Lock と Kafka の統合

Kafka ベースのシステムで Shed Lock を使用する最初のステップは、必要な依存関係を追加することです。 Java プロジェクトを使用している場合は、Shed Lock 依存関係をpom.xmlMaven を使用している場合。

<dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>shedlock-spring</artifactId> <version>4.44.0</version> </dependency>

依存関係を追加したら、ロック プロバイダーを構成する必要があります。データベース ベースや Redis ベースなど、さまざまなタイプのロック プロバイダーを使用できます。話を簡単にするために、データベースベースのロックプロバイダーを使用していると仮定します。ロック情報を保存するには、データベースにテーブルを設定する必要があります。

CREATE TABLE shedlock( name VARCHAR(64), lock_until TIMESTAMP(3) NULL, locked_at TIMESTAMP(3) NULL, locked_by VARCHAR(255), PRIMARY KEY (name) );

Spring Boot アプリケーションでは、次のようにロック プロバイダーを構成できます。

net.javacrumbs.shedlock.core.LockProvider をインポートします。 net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider をインポートします。 org.springframework.context.annotation.Bean をインポートします。 org.springframework.context.annotation.Configuration をインポートします。 org.springframework.jdbc.core.JdbcTemplate をインポートします。インポートjavax.sql.DataSource; @Configuration public class ShedLockConfig { @Bean public LockProvider lockProvider(DataSource dataSource) { return new JdbcTemplateLockProvider( JdbcTemplateLockProvider.Configuration.builder() .withJdbcTemplate(new JdbcTemplate(dataSource)) .usingDbTime() 。建てる() ); } }

Kafka コンシューマーでの Shed Lock の使用

Shed Lock のセットアップが完了したので、Kafka コンシューマーでそれをどのように使用できるかを見てみましょう。トピックからのメッセージを処理し、共有リソースを更新する Kafka コンシューマーがあるとします。使用できます@SchedulerLockタスクのインスタンスが一度に 1 つだけ実行されるようにするために、Shed Lock によって提供されるアノテーション。

net.javacrumbs.shedlock.spring.annotation.SchedulerLock をインポートします。 org.springframework.kafka.annotation.KafkaListener をインポートします。 org.springframework.stereotype.Service をインポートします。 @Service public class KafkaConsumerService { @KafkaListener(topics = "your - topic", groupId = "your - group - id") @SchedulerLock(name = "kafkaConsumerTask", lockAtMostFor = "PT10M", lockAtLeastFor = "PT1M") public void ConsumerMessage(String message) { // メッセージを処理し、共有リソースを更新する System.out.println("受信メッセージ: " + メッセージ); } }

上記のコードでは、@SchedulerLock注釈により、消費メッセージメソッドは少なくとも 1 分間、最大で 10 分間ロックされます。の名前属性はロックを識別するために使用されます。

考慮事項とベストプラクティス

Kafka ベースのシステムで Shed Lock を使用する場合、留意すべき点がいくつかあります。

21-62-2

  • ロック期間: 慎重に選択する必要があります。ロックアットモスト用そして少なくともロック価値観。もしロックアットモスト用値が短すぎると、タスクが完了する前にロックが解放され、複数のインスタンスが同時に実行される可能性があります。長すぎると、不必要な遅延が発生する可能性があります。
  • エラー処理: 適切なエラー処理を導入する必要があります。ロックされたタスクの実行中にエラーが発生した場合、ロックは正常に解放される必要があります。そうしないと、デッドロック状況が発生する可能性があります。
  • スケーラビリティ: Kafka システムが拡張するにつれて、ロック プロバイダーが増加した負荷を処理できることを確認してください。たとえば、データベース ベースのロック プロバイダーを使用している場合は、データベースが同時ロック要求を処理できることを確認してください。

さまざまな使用例に対応する他のタイプのロック

その間シェッドロックこれは、単一インスタンスのタスクの実行を保証するのに最適ですが、Kafka ベースのシステムで役立つ可能性のある他のタイプのロックもあります。たとえば、Kafka インフラストラクチャが配置されているデータ センターのドアやエンクロージャの物理的なロックに対処している場合は、次のことを検討するとよいでしょう。クォーターターンロックまたは丸型ドアロック。これらのロックは、機器のセキュリティをさらに強化します。

結論

Kafka ベースのシステムで Shed Lock を使用すると、タスクの同時実行によって発生する可能性のあるデータの不整合やその他の問題を回避できます。上記の手順に従うことで、Shed Lock を Kafka アプリケーションに簡単に統合できます。当社の高品質 Shed Lock の購入にご興味がある場合、または Kafka システムにどのように適合させるかについてご質問がある場合は、調達に関するディスカッションにお気軽にお問い合わせください。お客様のニーズに最適なソリューションを見つけるお手伝いをいたします。

参考文献

  • Spring Boot ドキュメント
  • カフカのドキュメント
  • Shed Lock GitHub リポジトリ
お問い合わせを送る